|
@@ -14,12 +14,13 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
|
|
import org.elasticsearch.action.search.SearchAction;
|
|
|
+import org.elasticsearch.action.search.SearchResponse;
|
|
|
import org.elasticsearch.action.search.ShardSearchFailure;
|
|
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
import org.elasticsearch.client.internal.Client;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
-import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
+import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.index.IndexModule;
|
|
@@ -37,6 +38,7 @@ import org.elasticsearch.tasks.TaskCancelledException;
|
|
|
import org.elasticsearch.tasks.TaskInfo;
|
|
|
import org.elasticsearch.test.AbstractMultiClustersTestCase;
|
|
|
import org.elasticsearch.test.InternalTestCluster;
|
|
|
+import org.elasticsearch.transport.RemoteClusterAware;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.xcontent.ToXContent;
|
|
|
import org.elasticsearch.xcontent.XContentFactory;
|
|
@@ -58,7 +60,9 @@ import org.junit.Before;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -87,6 +91,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
|
|
|
return List.of(REMOTE_CLUSTER);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
|
|
|
+ return Map.of(REMOTE_CLUSTER, randomBoolean());
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected boolean reuseClusters() {
|
|
|
return false;
|
|
@@ -116,17 +125,697 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
|
|
|
throw new IllegalStateException("not implemented");
|
|
|
}
|
|
|
);
|
|
|
+ QuerySpec<ThrowingQueryBuilder> throwingSpec = new QuerySpec<>(ThrowingQueryBuilder.NAME, ThrowingQueryBuilder::new, p -> {
|
|
|
+ throw new IllegalStateException("not implemented");
|
|
|
+ });
|
|
|
+
|
|
|
+ return List.of(slowRunningSpec, throwingSpec);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testClusterDetailsAfterSuccessfulCCS() throws Exception {
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String localIndex = (String) testClusterInfo.get("local.index");
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
+ int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
|
|
|
+ int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
|
|
|
+
|
|
|
+ SearchListenerPlugin.blockQueryPhase();
|
|
|
+
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
+ request.setCcsMinimizeRoundtrips(true);
|
|
|
+ request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
+ request.setKeepOnCompletion(true);
|
|
|
+ request.getSearchRequest().source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
|
|
|
+
|
|
|
+ AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+
|
|
|
+ {
|
|
|
+ SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+ }
|
|
|
+
|
|
|
+ SearchListenerPlugin.waitSearchStarted();
|
|
|
+ SearchListenerPlugin.allowQueryPhase();
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertNotNull(statusResponse.getCompletionStatus());
|
|
|
+ });
|
|
|
+
|
|
|
+ {
|
|
|
+ AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
|
|
|
+ assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(2));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ }
|
|
|
+
|
|
|
+ // check that the async_search/status response includes the same cluster details
|
|
|
+ {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = statusResponse.getClusters();
|
|
|
+ assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(2));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testClusterDetailsAfterCCSWithFailuresOnAllShards() throws Exception {
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String localIndex = (String) testClusterInfo.get("local.index");
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
+ boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
|
|
|
+
|
|
|
+ SearchListenerPlugin.blockQueryPhase();
|
|
|
+
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
+ request.setCcsMinimizeRoundtrips(true);
|
|
|
+ request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
+ request.setKeepOnCompletion(true);
|
|
|
+ // shardId -1 means to throw the Exception on all shards, so should result in complete search failure
|
|
|
+ ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), -1);
|
|
|
+ request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
|
|
|
+
|
|
|
+ AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+
|
|
|
+ {
|
|
|
+ SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+ }
|
|
|
+
|
|
|
+ SearchListenerPlugin.waitSearchStarted();
|
|
|
+ SearchListenerPlugin.allowQueryPhase();
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertNotNull(statusResponse.getCompletionStatus());
|
|
|
+ });
|
|
|
+
|
|
|
+ {
|
|
|
+ AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(0));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(2));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.FAILED));
|
|
|
+ assertNull(localClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(localClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(localClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(localClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(localClusterSearchInfo.getTook());
|
|
|
+ assertFalse(localClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ SearchResponse.Cluster.Status expectedStatus = skipUnavailable
|
|
|
+ ? SearchResponse.Cluster.Status.SKIPPED
|
|
|
+ : SearchResponse.Cluster.Status.FAILED;
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTook());
|
|
|
+ assertFalse(remoteClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ // check that the async_search/status response includes the same cluster details
|
|
|
+ {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ SearchResponse.Clusters clusters = statusResponse.getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(0));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(2));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.FAILED));
|
|
|
+ assertNull(localClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(localClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(localClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(localClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(localClusterSearchInfo.getTook());
|
|
|
+ assertFalse(localClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ SearchResponse.Cluster.Status expectedStatus = skipUnavailable
|
|
|
+ ? SearchResponse.Cluster.Status.SKIPPED
|
|
|
+ : SearchResponse.Cluster.Status.FAILED;
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTook());
|
|
|
+ assertFalse(remoteClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testClusterDetailsAfterCCSWithFailuresOnOneShardOnly() throws Exception {
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String localIndex = (String) testClusterInfo.get("local.index");
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
+ int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
|
|
|
+ int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
|
|
|
+
|
|
|
+ SearchListenerPlugin.blockQueryPhase();
|
|
|
+
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
+ request.setCcsMinimizeRoundtrips(true);
|
|
|
+ request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
+ request.setKeepOnCompletion(true);
|
|
|
+ // shardId 0 means to throw the Exception only on shard 0; all others should work
|
|
|
+ ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), 0);
|
|
|
+ request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
|
|
|
+
|
|
|
+ AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+
|
|
|
+ {
|
|
|
+ SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+ }
|
|
|
+
|
|
|
+ SearchListenerPlugin.waitSearchStarted();
|
|
|
+ SearchListenerPlugin.allowQueryPhase();
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertNotNull(statusResponse.getCompletionStatus());
|
|
|
+ });
|
|
|
+
|
|
|
+ {
|
|
|
+ AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(2));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
|
|
|
+ assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards - 1));
|
|
|
+ assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailedShards(), equalTo(1));
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ // check that the async_search/status response includes the same cluster details
|
|
|
+ {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ SearchResponse.Clusters clusters = statusResponse.getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(2));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
|
|
|
+ assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards - 1));
|
|
|
+ assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailedShards(), equalTo(1));
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testClusterDetailsAfterCCSWithFailuresOnOneClusterOnly() throws Exception {
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String localIndex = (String) testClusterInfo.get("local.index");
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
+ int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
|
|
|
+ boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
|
|
|
|
|
|
- return List.of(slowRunningSpec);
|
|
|
+ SearchListenerPlugin.blockQueryPhase();
|
|
|
+
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
+ request.setCcsMinimizeRoundtrips(true);
|
|
|
+ request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
+ request.setKeepOnCompletion(true);
|
|
|
+ // throw Exception of all shards of remoteIndex, but against localIndex
|
|
|
+ ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(
|
|
|
+ randomLong(),
|
|
|
+ new IllegalStateException("index corrupted"),
|
|
|
+ remoteIndex
|
|
|
+ );
|
|
|
+ request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
|
|
|
+
|
|
|
+ AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+ {
|
|
|
+ SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
|
|
|
+ }
|
|
|
+
|
|
|
+ SearchListenerPlugin.waitSearchStarted();
|
|
|
+ SearchListenerPlugin.allowQueryPhase();
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertNotNull(statusResponse.getCompletionStatus());
|
|
|
+ });
|
|
|
+
|
|
|
+ {
|
|
|
+ AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(1));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(1));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ SearchResponse.Cluster.Status expectedStatus = skipUnavailable
|
|
|
+ ? SearchResponse.Cluster.Status.SKIPPED
|
|
|
+ : SearchResponse.Cluster.Status.FAILED;
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTook());
|
|
|
+ assertFalse(remoteClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ // check that the async_search/status response includes the same cluster details
|
|
|
+ {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ SearchResponse.Clusters clusters = statusResponse.getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(2));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(1));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(1));
|
|
|
+
|
|
|
+ SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
|
|
|
+ assertNotNull(localClusterSearchInfo);
|
|
|
+ assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
|
|
|
+ assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ SearchResponse.Cluster.Status expectedStatus = skipUnavailable
|
|
|
+ ? SearchResponse.Cluster.Status.SKIPPED
|
|
|
+ : SearchResponse.Cluster.Status.FAILED;
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTook());
|
|
|
+ assertFalse(remoteClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testRemoteClusterOnlyCCSSuccessfulResult() throws Exception {
|
|
|
+ // for remote-only queries, we can't use the SearchListenerPlugin since that listens for search
|
|
|
+ // stage on the local cluster, so we only test final state of the search response
|
|
|
+ SearchListenerPlugin.negate();
|
|
|
+
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
+ int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
|
|
|
+
|
|
|
+ // search only the remote cluster
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
+ request.setCcsMinimizeRoundtrips(true);
|
|
|
+ request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
+ request.setKeepOnCompletion(true);
|
|
|
+ request.getSearchRequest().source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
|
|
|
+
|
|
|
+ AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertNotNull(statusResponse.getCompletionStatus());
|
|
|
+ });
|
|
|
+
|
|
|
+ {
|
|
|
+ AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
|
|
|
+ assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
|
|
|
+ assertThat(clusters.getTotal(), equalTo(1));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(1));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ }
|
|
|
+
|
|
|
+ // check that the async_search/status response includes the same cluster details
|
|
|
+ {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = statusResponse.getClusters();
|
|
|
+ assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
|
|
|
+ assertThat(clusters.getTotal(), equalTo(1));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(1));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testRemoteClusterOnlyCCSWithFailuresOnOneShardOnly() throws Exception {
|
|
|
+ // for remote-only queries, we can't use the SearchListenerPlugin since that listens for search
|
|
|
+ // stage on the local cluster, so we only test final state of the search response
|
|
|
+ SearchListenerPlugin.negate();
|
|
|
+
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
+ int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
|
|
|
+
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
+ request.setCcsMinimizeRoundtrips(true);
|
|
|
+ request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
+ request.setKeepOnCompletion(true);
|
|
|
+ // shardId 0 means to throw the Exception only on shard 0; all others should work
|
|
|
+ ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), 0);
|
|
|
+ request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
|
|
|
+
|
|
|
+ AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertNotNull(statusResponse.getCompletionStatus());
|
|
|
+ });
|
|
|
+
|
|
|
+ {
|
|
|
+ AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(1));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(1));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ // check that the async_search/status response includes the same cluster details
|
|
|
+ {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ SearchResponse.Clusters clusters = statusResponse.getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(1));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(1));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(0));
|
|
|
+
|
|
|
+ assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testRemoteClusterOnlyCCSWithFailuresOnAllShards() throws Exception {
|
|
|
+ // for remote-only queries, we can't use the SearchListenerPlugin since that listens for search
|
|
|
+ // stage on the local cluster, so we only test final state of the search response
|
|
|
+ SearchListenerPlugin.negate();
|
|
|
+
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
+ boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
|
|
|
+
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
+ request.setCcsMinimizeRoundtrips(true);
|
|
|
+ request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
+ request.setKeepOnCompletion(true);
|
|
|
+ // shardId -1 means to throw the Exception on all shards, so should result in complete search failure
|
|
|
+ ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), -1);
|
|
|
+ request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
|
|
|
+
|
|
|
+ AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertNotNull(statusResponse.getCompletionStatus());
|
|
|
+ });
|
|
|
+
|
|
|
+ {
|
|
|
+ AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
|
|
|
+
|
|
|
+ SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(1));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(0));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(1));
|
|
|
+
|
|
|
+ assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ SearchResponse.Cluster.Status expectedStatus = skipUnavailable
|
|
|
+ ? SearchResponse.Cluster.Status.SKIPPED
|
|
|
+ : SearchResponse.Cluster.Status.FAILED;
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTook());
|
|
|
+ assertFalse(remoteClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
+ }
|
|
|
+ // check that the async_search/status response includes the same cluster details
|
|
|
+ {
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ SearchResponse.Clusters clusters = statusResponse.getClusters();
|
|
|
+ assertThat(clusters.getTotal(), equalTo(1));
|
|
|
+ assertThat(clusters.getSuccessful(), equalTo(0));
|
|
|
+ assertThat(clusters.getSkipped(), equalTo(1));
|
|
|
+
|
|
|
+ assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
|
|
|
+
|
|
|
+ SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
|
|
|
+ assertNotNull(remoteClusterSearchInfo);
|
|
|
+ SearchResponse.Cluster.Status expectedStatus = skipUnavailable
|
|
|
+ ? SearchResponse.Cluster.Status.SKIPPED
|
|
|
+ : SearchResponse.Cluster.Status.FAILED;
|
|
|
+ assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTotalShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSuccessfulShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getSkippedShards());
|
|
|
+ assertNull(remoteClusterSearchInfo.getFailedShards());
|
|
|
+ assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
|
|
|
+ assertNull(remoteClusterSearchInfo.getTook());
|
|
|
+ assertFalse(remoteClusterSearchInfo.isTimedOut());
|
|
|
+ ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
|
|
|
+ assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void testCancelViaTasksAPI() throws Exception {
|
|
|
- setupTwoClusters();
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String localIndex = (String) testClusterInfo.get("local.index");
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
|
|
|
SearchListenerPlugin.blockQueryPhase();
|
|
|
|
|
|
- SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("demo", REMOTE_CLUSTER + ":prod");
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
request.setCcsMinimizeRoundtrips(randomBoolean());
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
request.setKeepOnCompletion(true);
|
|
@@ -250,11 +939,13 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
|
|
|
}
|
|
|
|
|
|
public void testCancelViaAsyncSearchDelete() throws Exception {
|
|
|
- setupTwoClusters();
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String localIndex = (String) testClusterInfo.get("local.index");
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
|
|
|
SearchListenerPlugin.blockQueryPhase();
|
|
|
|
|
|
- SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("demo", REMOTE_CLUSTER + ":prod");
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
request.setCcsMinimizeRoundtrips(randomBoolean());
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
request.setKeepOnCompletion(true);
|
|
@@ -348,7 +1039,9 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
|
|
|
|
|
|
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97286")
|
|
|
public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws Exception {
|
|
|
- setupTwoClusters();
|
|
|
+ Map<String, Object> testClusterInfo = setupTwoClusters();
|
|
|
+ String localIndex = (String) testClusterInfo.get("local.index");
|
|
|
+ String remoteIndex = (String) testClusterInfo.get("remote.index");
|
|
|
|
|
|
SearchListenerPlugin.blockQueryPhase();
|
|
|
|
|
@@ -357,7 +1050,7 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
|
|
|
SlowRunningQueryBuilder slowRunningQueryBuilder = new SlowRunningQueryBuilder(searchTimeout.millis() * 5);
|
|
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(slowRunningQueryBuilder).timeout(searchTimeout);
|
|
|
|
|
|
- SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("demo", REMOTE_CLUSTER + ":prod");
|
|
|
+ SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
|
|
|
request.setCcsMinimizeRoundtrips(randomBoolean());
|
|
|
request.getSearchRequest().source(sourceBuilder);
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
@@ -457,48 +1150,50 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
|
|
|
return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
|
|
|
}
|
|
|
|
|
|
- private void setupTwoClusters() throws Exception {
|
|
|
- assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo"));
|
|
|
- indexDocs(client(LOCAL_CLUSTER), "demo");
|
|
|
+ private Map<String, Object> setupTwoClusters() {
|
|
|
+ String localIndex = "demo";
|
|
|
+ int numShardsLocal = randomIntBetween(3, 6);
|
|
|
+ Settings localSettings = indexSettings(numShardsLocal, 0).build();
|
|
|
+ assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate(localIndex).setSettings(localSettings));
|
|
|
+ indexDocs(client(LOCAL_CLUSTER), localIndex);
|
|
|
+
|
|
|
+ String remoteIndex = "prod";
|
|
|
+ int numShardsRemote = randomIntBetween(3, 6);
|
|
|
final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER);
|
|
|
- remoteCluster.ensureAtLeastNumDataNodes(1);
|
|
|
- final Settings.Builder allocationFilter = Settings.builder();
|
|
|
- if (randomBoolean()) {
|
|
|
- remoteCluster.ensureAtLeastNumDataNodes(3);
|
|
|
- List<String> remoteDataNodes = remoteCluster.clusterService()
|
|
|
- .state()
|
|
|
- .nodes()
|
|
|
- .stream()
|
|
|
- .filter(DiscoveryNode::canContainData)
|
|
|
- .map(DiscoveryNode::getName)
|
|
|
- .toList();
|
|
|
- assertThat(remoteDataNodes.size(), Matchers.greaterThanOrEqualTo(3));
|
|
|
- List<String> seedNodes = randomSubsetOf(between(1, remoteDataNodes.size() - 1), remoteDataNodes);
|
|
|
- disconnectFromRemoteClusters();
|
|
|
- configureRemoteCluster(REMOTE_CLUSTER, seedNodes);
|
|
|
- if (randomBoolean()) {
|
|
|
- // Using proxy connections
|
|
|
- allocationFilter.put("index.routing.allocation.exclude._name", String.join(",", seedNodes));
|
|
|
- } else {
|
|
|
- allocationFilter.put("index.routing.allocation.include._name", String.join(",", seedNodes));
|
|
|
- }
|
|
|
- }
|
|
|
+ remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3));
|
|
|
+ final Settings.Builder remoteSettings = Settings.builder();
|
|
|
+ remoteSettings.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsRemote);
|
|
|
+
|
|
|
assertAcked(
|
|
|
client(REMOTE_CLUSTER).admin()
|
|
|
.indices()
|
|
|
- .prepareCreate("prod")
|
|
|
- .setSettings(Settings.builder().put(allocationFilter.build()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
|
|
|
+ .prepareCreate(remoteIndex)
|
|
|
+ .setSettings(Settings.builder().put(remoteSettings.build()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
|
|
|
);
|
|
|
assertFalse(
|
|
|
client(REMOTE_CLUSTER).admin()
|
|
|
.cluster()
|
|
|
- .prepareHealth("prod")
|
|
|
+ .prepareHealth(remoteIndex)
|
|
|
.setWaitForYellowStatus()
|
|
|
.setTimeout(TimeValue.timeValueSeconds(10))
|
|
|
.get()
|
|
|
.isTimedOut()
|
|
|
);
|
|
|
- indexDocs(client(REMOTE_CLUSTER), "prod");
|
|
|
+ indexDocs(client(REMOTE_CLUSTER), remoteIndex);
|
|
|
+
|
|
|
+ String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
|
|
|
+ Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
|
|
|
+ boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
|
|
|
+ .getClusterSettings()
|
|
|
+ .get(skipUnavailableSetting);
|
|
|
+
|
|
|
+ Map<String, Object> clusterInfo = new HashMap<>();
|
|
|
+ clusterInfo.put("local.num_shards", numShardsLocal);
|
|
|
+ clusterInfo.put("local.index", localIndex);
|
|
|
+ clusterInfo.put("remote.num_shards", numShardsRemote);
|
|
|
+ clusterInfo.put("remote.index", remoteIndex);
|
|
|
+ clusterInfo.put("remote.skip_unavailable", skipUnavailable);
|
|
|
+ return clusterInfo;
|
|
|
}
|
|
|
|
|
|
private int indexDocs(Client client, String index) {
|
|
@@ -520,6 +1215,22 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
|
|
|
private static final AtomicReference<CountDownLatch> queryLatch = new AtomicReference<>();
|
|
|
private static final AtomicReference<CountDownLatch> failedQueryLatch = new AtomicReference<>();
|
|
|
|
|
|
+ /**
|
|
|
+ * For tests that cannot use SearchListenerPlugin, ensure all latches are unset to
|
|
|
+ * avoid test problems around searches of the .async-search index
|
|
|
+ */
|
|
|
+ static void negate() {
|
|
|
+ if (startedLatch.get() != null) {
|
|
|
+ startedLatch.get().countDown();
|
|
|
+ }
|
|
|
+ if (queryLatch.get() != null) {
|
|
|
+ queryLatch.get().countDown();
|
|
|
+ }
|
|
|
+ if (failedQueryLatch.get() != null) {
|
|
|
+ failedQueryLatch.get().countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
static void reset() {
|
|
|
startedLatch.set(new CountDownLatch(1));
|
|
|
failedQueryLatch.set(new CountDownLatch(1));
|