浏览代码

Close search contexts on reassigned shard (#68539)

If a shard is reassigned to a node, but it has open searches (could be
scrolls even), the current behavior is to throw a
ShardLockObtainFailedException. This commit changes the behavior to
close the search contexts, likely failing some of the searches. The
sentiment is to prefer restoring availability over trying to complete
those searches. A situation where this can happen is when master(s) are
restarted, which is likely to cause similar search issues anyway.
Henning Andersen 4 年之前
父节点
当前提交
c4e1074e68

+ 77 - 0
server/src/internalClusterTest/java/org/elasticsearch/search/SearchServiceCleanupOnLostMasterIT.java

@@ -0,0 +1,77 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search;
+
+
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.coordination.FollowersChecker;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.common.CheckedBiConsumer;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TransportService;
+import org.hamcrest.Matchers;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+@ESIntegTestCase.ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST)
+public class SearchServiceCleanupOnLostMasterIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return List.of(MockTransportService.TestPlugin.class);
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal))
+            .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
+            .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms").build();
+    }
+
+    public void testMasterRestart() throws Exception {
+        testLostMaster((master, dataNode) -> internalCluster().restartNode(master));
+    }
+
+    public void testDroppedOutNode() throws Exception {
+        testLostMaster((master, dataNode) -> {
+            final MockTransportService masterTransportService
+                = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
+            final TransportService dataTransportService = internalCluster().getInstance(TransportService.class, dataNode);
+            masterTransportService.addFailToSendNoConnectRule(dataTransportService, FollowersChecker.FOLLOWER_CHECK_ACTION_NAME);
+
+            assertBusy(() -> {
+                final ClusterHealthStatus indexHealthStatus = client(master).admin().cluster()
+                    .health(Requests.clusterHealthRequest("test")).actionGet().getStatus();
+                assertThat(indexHealthStatus, Matchers.is(ClusterHealthStatus.RED));
+            });
+            masterTransportService.clearAllRules();
+        });
+    }
+
+    private void testLostMaster(CheckedBiConsumer<String, String, Exception> loseMaster) throws Exception {
+        final String master = internalCluster().startMasterOnlyNode();
+        final String dataNode = internalCluster().startDataOnlyNode();
+
+        index("test", "test", "{}");
+
+        assertThat(client().prepareSearch("test").setScroll("30m").get().getScrollId(), is(notNullValue()));
+
+        loseMaster.accept(master, dataNode);
+        // in the past, this failed because the search context for the scroll would prevent the shard lock from being released.
+        ensureYellow();
+    }
+}

+ 3 - 3
server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

@@ -147,13 +147,13 @@ final class CompositeIndexEventListener implements IndexEventListener {
     }
 
     @Override
-    public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
+    public void beforeIndexShardCreated(ShardRouting shardRouting, Settings indexSettings) {
         for (IndexEventListener listener : listeners) {
             try {
-                listener.beforeIndexShardCreated(shardId, indexSettings);
+                listener.beforeIndexShardCreated(shardRouting, indexSettings);
             } catch (Exception e) {
                 logger.warn(() ->
-                    new ParameterizedMessage("[{}] failed to invoke before shard created callback", shardId), e);
+                    new ParameterizedMessage("[{}] failed to invoke before shard created callback", shardRouting), e);
                 throw e;
             }
         }

+ 1 - 1
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -401,9 +401,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         Store store = null;
         IndexShard indexShard = null;
         ShardLock lock = null;
+        eventListener.beforeIndexShardCreated(routing, indexSettings);
         try {
             lock = nodeEnv.shardLock(shardId, "starting shard", TimeUnit.SECONDS.toMillis(5));
-            eventListener.beforeIndexShardCreated(shardId, indexSettings);
             ShardPath path;
             try {
                 path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());

+ 4 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

@@ -109,9 +109,11 @@ public interface IndexEventListener {
     }
 
     /**
-     * Called before the index shard gets created.
+     * Called before the index shard gets created, before obtaining the shard lock.
+     * @param routing the routing entry that caused the shard to be created.
+     * @param indexSettings the shards index settings
      */
-    default void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
+    default void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings) {
     }
 
     /**

+ 1 - 1
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -604,7 +604,7 @@ public class IndicesService extends AbstractLifecycleComponent
             // double check that shard is not created.
             new IndexEventListener() {
                 @Override
-                public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
+                public void beforeIndexShardCreated(ShardRouting shardRouting, Settings indexSettings) {
                     assert false : "temp index should not trigger shard creation";
                     throw new ElasticsearchException("temp index should not trigger shard creation [{}]", index);
                 }

+ 21 - 0
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -24,6 +24,7 @@ import org.elasticsearch.action.search.SearchShardTask;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.UUIDs;
@@ -289,6 +290,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         }
     }
 
+    @Override
+    public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings) {
+        // if a shard is reassigned to a node where we still have searches against the same shard and it is not a relocate, we prefer
+        // to stop searches to restore full availability as fast as possible. A known scenario here is that we lost connection to master
+        // or master(s) were restarted.
+        assert routing.initializing();
+        if (routing.isRelocationTarget() == false) {
+            freeAllContextsForShard(routing.shardId());
+        }
+    }
+
     protected void putReaderContext(ReaderContext context) {
         final ReaderContext previous = activeReaders.put(context.id().getId(), context);
         assert previous == null;
@@ -805,6 +817,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         }
     }
 
+    private void freeAllContextsForShard(ShardId shardId) {
+        assert shardId != null;
+        for (ReaderContext ctx : activeReaders.values()) {
+            if (shardId.equals(ctx.indexShard().shardId())) {
+                freeReaderContext(ctx.id());
+            }
+        }
+    }
+
     public boolean freeReaderContext(ShardSearchContextId contextId) {
         if (sessionId.equals(contextId.getSessionId())) {
             try (ReaderContext context = removeReaderContext(contextId.getId())) {

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java

@@ -65,7 +65,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
             }
 
             @Override
-            public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
+            public void beforeIndexShardCreated(ShardRouting shardRouting, Settings indexSettings) {
                 assertEquals(3, counter.get());
                 counter.incrementAndGet();
             }

+ 30 - 2
server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

@@ -29,6 +29,10 @@ import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -264,13 +268,18 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
         AtomicBoolean running = new AtomicBoolean(true);
         CountDownLatch startGun = new CountDownLatch(1);
         Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
-
+        ShardRouting routing = TestShardRouting.newShardRouting(indexShard.shardId(), randomAlphaOfLength(5), randomBoolean(),
+            ShardRoutingState.INITIALIZING);
         final Thread thread = new Thread() {
             @Override
             public void run() {
                 startGun.countDown();
                 while(running.get()) {
-                    service.afterIndexRemoved(indexService.index(), indexService.getIndexSettings(), DELETED);
+                    if (randomBoolean()) {
+                        service.afterIndexRemoved(indexService.index(), indexService.getIndexSettings(), DELETED);
+                    } else {
+                        service.beforeIndexShardCreated(routing, indexService.getIndexSettings().getSettings());
+                    }
                     if (randomBoolean()) {
                         // here we trigger some refreshes to ensure the IR go out of scope such that we hit ACE if we access a search
                         // context in a non-sane way.
@@ -397,6 +406,25 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
         assertEquals(0, totalStats.getFetchCurrent());
     }
 
+    public void testBeforeShardLockDuringShardCreate() {
+        IndexService indexService = createIndex("index", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
+        client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
+        SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
+        assertThat(searchResponse.getScrollId(), is(notNullValue()));
+        SearchService service = getInstanceFromNode(SearchService.class);
+
+        assertEquals(1, service.getActiveContexts());
+        service.beforeIndexShardCreated(TestShardRouting.newShardRouting("test", 0, randomAlphaOfLength(5),
+            randomAlphaOfLength(5), randomBoolean(), ShardRoutingState.INITIALIZING), indexService.getIndexSettings().getSettings());
+        assertEquals(1, service.getActiveContexts());
+
+        service.beforeIndexShardCreated(TestShardRouting.newShardRouting(new ShardId(indexService.index(), 0),
+            randomAlphaOfLength(5),
+            randomBoolean(),
+            ShardRoutingState.INITIALIZING), indexService.getIndexSettings().getSettings());
+        assertEquals(0, service.getActiveContexts());
+    }
+
     public void testTimeout() throws IOException {
         createIndex("index");
         final SearchService service = getInstanceFromNode(SearchService.class);

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java

@@ -131,8 +131,8 @@ public final class MockIndexEventListener {
         }
 
         @Override
-        public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
-            delegate.beforeIndexShardCreated(shardId, indexSettings);
+        public void beforeIndexShardCreated(ShardRouting shardrouting, Settings indexSettings) {
+            delegate.beforeIndexShardCreated(shardrouting, indexSettings);
         }
 
         @Override