|
@@ -8,6 +8,9 @@
|
|
|
package org.elasticsearch.search.ccs;
|
|
|
|
|
|
import org.elasticsearch.action.DocWriteResponse;
|
|
|
+import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
|
|
|
+import org.elasticsearch.action.admin.cluster.remote.RemoteInfoResponse;
|
|
|
+import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
|
|
|
import org.elasticsearch.action.search.OpenPointInTimeRequest;
|
|
|
import org.elasticsearch.action.search.OpenPointInTimeResponse;
|
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
@@ -35,6 +38,7 @@ import org.elasticsearch.rest.RestStatus;
|
|
|
import org.elasticsearch.search.SearchHit;
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
import org.elasticsearch.test.AbstractMultiClustersTestCase;
|
|
|
+import org.elasticsearch.transport.RemoteConnectionInfo;
|
|
|
import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.xcontent.XContentFactory;
|
|
|
import org.elasticsearch.xcontent.XContentType;
|
|
@@ -56,6 +60,7 @@ import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
@@ -92,14 +97,16 @@ public abstract class AbstractSemanticCrossClusterSearchTestCase extends Abstrac
|
|
|
return List.of(LocalStateInferencePlugin.class, TestInferenceServicePlugin.class, FakeMlPlugin.class);
|
|
|
}
|
|
|
|
|
|
- protected void setupTwoClusters(TestIndexInfo localIndexInfo, TestIndexInfo remoteIndexInfo) throws IOException {
|
|
|
+ protected void setupTwoClusters(TestIndexInfo localIndexInfo, TestIndexInfo remoteIndexInfo) throws Exception {
|
|
|
setupCluster(LOCAL_CLUSTER, localIndexInfo);
|
|
|
setupCluster(REMOTE_CLUSTER, remoteIndexInfo);
|
|
|
+ waitUntilRemoteClusterConnected(REMOTE_CLUSTER);
|
|
|
}
|
|
|
|
|
|
protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws IOException {
|
|
|
final Client client = client(clusterAlias);
|
|
|
final String indexName = indexInfo.name();
|
|
|
+ final int dataNodeCount = cluster(clusterAlias).numDataNodes();
|
|
|
|
|
|
for (var entry : indexInfo.inferenceEndpoints().entrySet()) {
|
|
|
String inferenceId = entry.getKey();
|
|
@@ -117,13 +124,13 @@ public abstract class AbstractSemanticCrossClusterSearchTestCase extends Abstrac
|
|
|
createInferenceEndpoint(client, minimalServiceSettings.taskType(), inferenceId, serviceSettings);
|
|
|
}
|
|
|
|
|
|
- Settings indexSettings = indexSettings(randomIntBetween(2, 5), randomIntBetween(0, 1)).build();
|
|
|
+ Settings indexSettings = indexSettings(randomIntBetween(1, dataNodeCount), 0).build();
|
|
|
assertAcked(client.admin().indices().prepareCreate(indexName).setSettings(indexSettings).setMapping(indexInfo.mappings()));
|
|
|
assertFalse(
|
|
|
client.admin()
|
|
|
.cluster()
|
|
|
.prepareHealth(TEST_REQUEST_TIMEOUT, indexName)
|
|
|
- .setWaitForYellowStatus()
|
|
|
+ .setWaitForGreenStatus()
|
|
|
.setTimeout(TimeValue.timeValueSeconds(10))
|
|
|
.get()
|
|
|
.isTimedOut()
|
|
@@ -140,6 +147,18 @@ public abstract class AbstractSemanticCrossClusterSearchTestCase extends Abstrac
|
|
|
assertThat(refreshResponse.getStatus(), is(RestStatus.OK));
|
|
|
}
|
|
|
|
|
|
+ protected void waitUntilRemoteClusterConnected(String clusterAlias) throws Exception {
|
|
|
+ RemoteInfoRequest request = new RemoteInfoRequest();
|
|
|
+ assertBusy(() -> {
|
|
|
+ RemoteInfoResponse response = client().execute(TransportRemoteInfoAction.TYPE, request).actionGet(TEST_REQUEST_TIMEOUT);
|
|
|
+ boolean connected = response.getInfos()
|
|
|
+ .stream()
|
|
|
+ .filter(i -> i.getClusterAlias().equals(clusterAlias))
|
|
|
+ .anyMatch(RemoteConnectionInfo::isConnected);
|
|
|
+ assertThat(connected, is(true));
|
|
|
+ }, 30, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
protected BytesReference openPointInTime(String[] indices, TimeValue keepAlive) {
|
|
|
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive);
|
|
|
final OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
|