Browse Source

Merge pull request ESQL-1017 from elastic/main

🤖 ESQL: Merge upstream
elasticsearchmachine 2 years ago
parent
commit
ff2ccbf941

+ 5 - 0
docs/changelog/95229.yaml

@@ -0,0 +1,5 @@
+pr: 95229
+summary: Avoid null Location in post write refresh
+area: CRUD
+type: bug
+issues: []

+ 1 - 1
server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

@@ -42,7 +42,7 @@ public class PostWriteRefresh {
             case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
                 @Override
                 public void onResponse(Boolean forced) {
-                    if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
+                    if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && location != null) {
                         refreshUnpromotables(indexShard, location, listener, forced);
                     } else {
                         listener.onResponse(forced);

+ 47 - 2
server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.coordination.ElectionStrategy;
 import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
+import org.elasticsearch.cluster.coordination.PreVoteCollector;
 import org.elasticsearch.cluster.coordination.Reconfigurator;
 import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -48,6 +49,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.function.BiConsumer;
@@ -182,6 +184,9 @@ public class DiscoveryModule {
                 );
         }
 
+        var reconfigurator = getReconfigurator(settings, clusterSettings, clusterCoordinationPlugins);
+        var preVoteCollectorFactory = getPreVoteCollectorFactory(clusterCoordinationPlugins);
+
         if (MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
             || LEGACY_MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
             || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
@@ -203,9 +208,9 @@ public class DiscoveryModule {
                 electionStrategy,
                 nodeHealthService,
                 circuitBreakerService,
-                new Reconfigurator(settings, clusterSettings),
+                reconfigurator,
                 LeaderHeartbeatService.NO_OP,
-                StatefulPreVoteCollector::new
+                preVoteCollectorFactory
             );
         } else {
             throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
@@ -214,6 +219,46 @@ public class DiscoveryModule {
         logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames);
     }
 
+    // visible for testing
+    static Reconfigurator getReconfigurator(
+        Settings settings,
+        ClusterSettings clusterSettings,
+        List<ClusterCoordinationPlugin> clusterCoordinationPlugins
+    ) {
+        final var reconfiguratorFactories = clusterCoordinationPlugins.stream()
+            .map(ClusterCoordinationPlugin::getReconfiguratorFactory)
+            .flatMap(Optional::stream)
+            .toList();
+
+        if (reconfiguratorFactories.size() > 1) {
+            throw new IllegalStateException("multiple reconfigurator factories found: " + reconfiguratorFactories);
+        }
+
+        if (reconfiguratorFactories.size() == 1) {
+            return reconfiguratorFactories.get(0).newReconfigurator(settings, clusterSettings);
+        }
+
+        return new Reconfigurator(settings, clusterSettings);
+    }
+
+    // visible for testing
+    static PreVoteCollector.Factory getPreVoteCollectorFactory(List<ClusterCoordinationPlugin> clusterCoordinationPlugins) {
+        final var preVoteCollectorFactories = clusterCoordinationPlugins.stream()
+            .map(ClusterCoordinationPlugin::getPreVoteCollectorFactory)
+            .flatMap(Optional::stream)
+            .toList();
+
+        if (preVoteCollectorFactories.size() > 1) {
+            throw new IllegalStateException("multiple pre-vote collector factories found: " + preVoteCollectorFactories);
+        }
+
+        if (preVoteCollectorFactories.size() == 1) {
+            return preVoteCollectorFactories.get(0);
+        }
+
+        return StatefulPreVoteCollector::new;
+    }
+
     public static boolean isSingleNodeDiscovery(Settings settings) {
         return SINGLE_NODE_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings));
     }

+ 14 - 0
server/src/main/java/org/elasticsearch/plugins/ClusterCoordinationPlugin.java

@@ -11,6 +11,8 @@ package org.elasticsearch.plugins;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.CoordinationState;
 import org.elasticsearch.cluster.coordination.ElectionStrategy;
+import org.elasticsearch.cluster.coordination.PreVoteCollector;
+import org.elasticsearch.cluster.coordination.Reconfigurator;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -52,6 +54,14 @@ public interface ClusterCoordinationPlugin {
         return Optional.empty();
     }
 
+    default Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+        return Optional.empty();
+    }
+
+    default Optional<PreVoteCollector.Factory> getPreVoteCollectorFactory() {
+        return Optional.empty();
+    }
+
     interface PersistedStateFactory {
         CoordinationState.PersistedState createPersistedState(
             Settings settings,
@@ -68,4 +78,8 @@ public interface ClusterCoordinationPlugin {
             ThreadPool threadPool
         );
     }
+
+    interface ReconfiguratorFactory {
+        Reconfigurator newReconfigurator(Settings settings, ClusterSettings clusterSettings);
+    }
 }

+ 34 - 0
server/src/test/java/org/elasticsearch/action/support/replication/PostWriteRefreshTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -185,6 +186,39 @@ public class PostWriteRefreshTests extends IndexShardTestCase {
         }
     }
 
+    public void testWaitForWithNullLocationCompletedImmediately() throws IOException {
+        final IndexShard primary = spy(newShard(true));
+        recoverShardFromStore(primary);
+        ReplicationGroup realReplicationGroup = primary.getReplicationGroup();
+        try {
+            PlainActionFuture<Boolean> f = PlainActionFuture.newFuture();
+            PostWriteRefresh postWriteRefresh = new PostWriteRefresh(transportService);
+
+            ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
+            IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
+            when(primary.getReplicationGroup()).thenReturn(replicationGroup).thenReturn(realReplicationGroup);
+            when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
+            ShardRouting shardRouting = ShardRouting.newUnassigned(
+                primary.shardId(),
+                false,
+                RecoverySource.PeerRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "message"),
+                ShardRouting.Role.SEARCH_ONLY
+            );
+            // Randomly test scenarios with and without unpromotables
+            if (randomBoolean()) {
+                when(routingTable.unpromotableShards()).thenReturn(Collections.emptyList());
+            } else {
+                when(routingTable.unpromotableShards()).thenReturn(List.of(shardRouting));
+            }
+            WriteRequest.RefreshPolicy policy = WriteRequest.RefreshPolicy.WAIT_UNTIL;
+            postWriteRefresh.refreshShard(policy, primary, null, f);
+            f.actionGet();
+        } finally {
+            closeShards(primary, primary);
+        }
+    }
+
     private static void assertEngineContainsIdNoRefresh(IndexShard replica, String id) throws IOException {
         List<DocIdSeqNoAndSource> docIds = EngineTestCase.getDocIds(replica.getEngineOrNull(), false);
         Set<String> ids = docIds.stream().map(DocIdSeqNoAndSource::id).collect(Collectors.toSet());

+ 80 - 0
server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -11,6 +11,9 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
+import org.elasticsearch.cluster.coordination.PreVoteCollector;
+import org.elasticsearch.cluster.coordination.Reconfigurator;
+import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.service.ClusterApplier;
@@ -24,6 +27,7 @@ import org.elasticsearch.gateway.GatewayMetaState;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.plugins.ClusterCoordinationPlugin;
 import org.elasticsearch.plugins.DiscoveryPlugin;
+import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -36,10 +40,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
 
 public class DiscoveryModuleTests extends ESTestCase {
@@ -219,4 +226,77 @@ public class DiscoveryModuleTests extends ESTestCase {
                 + "[multi-node] instead."
         );
     }
+
+    public void testRejectsMultipleReconfigurators() {
+        assertThat(
+            expectThrows(
+                IllegalStateException.class,
+                () -> DiscoveryModule.getReconfigurator(
+                    Settings.EMPTY,
+                    new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                    List.of(
+                        new BaseTestClusterCoordinationPlugin(),
+                        new TestClusterCoordinationPlugin1(),
+                        new TestClusterCoordinationPlugin2()
+                    )
+                )
+            ).getMessage(),
+            containsString("multiple reconfigurator factories found")
+        );
+
+        assertThat(
+            DiscoveryModule.getReconfigurator(
+                Settings.EMPTY,
+                new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                List.of(new BaseTestClusterCoordinationPlugin())
+            ),
+            is(BaseTestClusterCoordinationPlugin.reconfiguratorInstance)
+        );
+    }
+
+    public void testRejectsMultiplePreVoteCollectorFactories() {
+        assertThat(
+            expectThrows(
+                IllegalStateException.class,
+                () -> DiscoveryModule.getPreVoteCollectorFactory(
+                    List.of(new BaseTestClusterCoordinationPlugin(), new TestClusterCoordinationPlugin1() {
+                        @Override
+                        public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+                            return Optional.empty();
+                        }
+                    }, new TestClusterCoordinationPlugin2() {
+                        @Override
+                        public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+                            return Optional.empty();
+                        }
+                    })
+                )
+            ).getMessage(),
+            containsString("multiple pre-vote collector factories found")
+        );
+
+        assertThat(
+            DiscoveryModule.getPreVoteCollectorFactory(List.of(new BaseTestClusterCoordinationPlugin())),
+            is(BaseTestClusterCoordinationPlugin.preVoteCollectorFactory)
+        );
+    }
+
+    private static class BaseTestClusterCoordinationPlugin extends Plugin implements ClusterCoordinationPlugin {
+        static Reconfigurator reconfiguratorInstance;
+        static PreVoteCollector.Factory preVoteCollectorFactory = StatefulPreVoteCollector::new;
+
+        @Override
+        public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+            return Optional.of((settings, clusterSettings) -> reconfiguratorInstance = new Reconfigurator(settings, clusterSettings));
+        }
+
+        @Override
+        public Optional<PreVoteCollector.Factory> getPreVoteCollectorFactory() {
+            return Optional.of(preVoteCollectorFactory);
+        }
+    }
+
+    public static class TestClusterCoordinationPlugin1 extends BaseTestClusterCoordinationPlugin {}
+
+    public static class TestClusterCoordinationPlugin2 extends BaseTestClusterCoordinationPlugin {}
 }