Browse Source

Add the possibility to inject a custom RecoveryState factory to IndexStorePlugin implementations (#59038)

Add a custom factory for recovery state into IndexStorePlugin that
allows different implementors to provide its own RecoveryState
implementation.
Francisco Fernández Castaño 5 years ago
parent
commit
793750e099

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -156,6 +156,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
             BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
             IndexModule.INDEX_STORE_TYPE_SETTING,
             IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
+            IndexModule.INDEX_RECOVERY_TYPE_SETTING,
             IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
             FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING,
             EngineConfig.INDEX_CODEC_SETTING,

+ 28 - 2
server/src/main/java/org/elasticsearch/index/IndexModule.java

@@ -59,6 +59,7 @@ import org.elasticsearch.indices.IndicesQueryCache;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.mapper.MapperRegistry;
+import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.plugins.IndexStorePlugin;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
@@ -100,9 +101,14 @@ public final class IndexModule {
 
     private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
 
+    private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
+
     public static final Setting<String> INDEX_STORE_TYPE_SETTING =
             new Setting<>("index.store.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);
 
+    public static final Setting<String> INDEX_RECOVERY_TYPE_SETTING =
+        new Setting<>("index.recovery.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);
+
     /** On which extensions to load data into the file-system cache upon opening of files.
      *  This only works with the mmap directory, and even in that case is still
      *  best-effort only. */
@@ -134,6 +140,7 @@ public final class IndexModule {
     private final IndexNameExpressionResolver expressionResolver;
     private final AtomicBoolean frozen = new AtomicBoolean(false);
     private final BooleanSupplier allowExpensiveQueries;
+    private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
 
     /**
      * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -150,7 +157,8 @@ public final class IndexModule {
             final EngineFactory engineFactory,
             final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
             final BooleanSupplier allowExpensiveQueries,
-            final IndexNameExpressionResolver expressionResolver) {
+            final IndexNameExpressionResolver expressionResolver,
+            final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
         this.indexSettings = indexSettings;
         this.analysisRegistry = analysisRegistry;
         this.engineFactory = Objects.requireNonNull(engineFactory);
@@ -159,6 +167,7 @@ public final class IndexModule {
         this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
         this.allowExpensiveQueries = allowExpensiveQueries;
         this.expressionResolver = expressionResolver;
+        this.recoveryStateFactories = recoveryStateFactories;
     }
 
     /**
@@ -410,6 +419,7 @@ public final class IndexModule {
             indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
         eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
         final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
+        final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
         QueryCache queryCache = null;
         IndexAnalyzers indexAnalyzers = null;
         boolean success = false;
@@ -432,7 +442,7 @@ public final class IndexModule {
                 engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
                 directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
                 indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver,
-                valuesSourceRegistry);
+                valuesSourceRegistry, recoveryStateFactory);
             success = true;
             return indexService;
         } finally {
@@ -471,6 +481,22 @@ public final class IndexModule {
         return factory;
     }
 
+    private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
+        final IndexSettings indexSettings, final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
+        final String recoveryType = indexSettings.getValue(INDEX_RECOVERY_TYPE_SETTING);
+
+        if (recoveryType.isEmpty()) {
+            return DEFAULT_RECOVERY_STATE_FACTORY;
+        }
+
+        IndexStorePlugin.RecoveryStateFactory factory = recoveryStateFactories.get(recoveryType);
+        if (factory == null) {
+            throw new IllegalArgumentException("Unknown recovery type [" + recoveryType + "]");
+        }
+
+        return factory;
+    }
+
     /**
      * creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
      * doing so will result in an exception.

+ 10 - 1
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -31,6 +31,7 @@ import org.elasticsearch.Assertions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedFunction;
@@ -79,6 +80,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.mapper.MapperRegistry;
+import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.plugins.IndexStorePlugin;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
@@ -114,6 +116,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     private final NodeEnvironment nodeEnv;
     private final ShardStoreDeleter shardStoreDeleter;
     private final IndexStorePlugin.DirectoryFactory directoryFactory;
+    private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
     private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
     private final IndexCache indexCache;
     private final MapperService mapperService;
@@ -174,7 +177,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
             BooleanSupplier idFieldDataEnabled,
             BooleanSupplier allowExpensiveQueries,
             IndexNameExpressionResolver expressionResolver,
-            ValuesSourceRegistry valuesSourceRegistry) {
+            ValuesSourceRegistry valuesSourceRegistry,
+            IndexStorePlugin.RecoveryStateFactory recoveryStateFactory) {
         super(indexSettings);
         this.allowExpensiveQueries = allowExpensiveQueries;
         this.indexSettings = indexSettings;
@@ -223,6 +227,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         this.eventListener = eventListener;
         this.nodeEnv = nodeEnv;
         this.directoryFactory = directoryFactory;
+        this.recoveryStateFactory = recoveryStateFactory;
         this.engineFactory = Objects.requireNonNull(engineFactory);
         // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
         this.readerWrapper = wrapperFactory.apply(this);
@@ -563,6 +568,10 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         }
     }
 
+    public RecoveryState createRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, DiscoveryNode sourceNode) {
+        return recoveryStateFactory.newRecoveryState(shardRouting, targetNode, sourceNode);
+    }
+
     @Override
     public IndexSettings getIndexSettings() {
         return indexSettings;

+ 11 - 5
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -221,6 +221,7 @@ public class IndicesService extends AbstractLifecycleComponent
     private final MetaStateService metaStateService;
     private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
     private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
+    private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
     final AbstractRefCounted indicesRefCount; // pkg-private for testing
     private final CountDownLatch closeLatch = new CountDownLatch(1);
     private volatile boolean idFieldDataEnabled;
@@ -245,7 +246,8 @@ public class IndicesService extends AbstractLifecycleComponent
                           IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
                           ScriptService scriptService, ClusterService clusterService, Client client, MetaStateService metaStateService,
                           Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
-                          Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry) {
+                          Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry,
+                          Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
         this.settings = settings;
         this.threadPool = threadPool;
         this.pluginsService = pluginsService;
@@ -291,6 +293,7 @@ public class IndicesService extends AbstractLifecycleComponent
         }
 
         this.directoryFactories = directoryFactories;
+        this.recoveryStateFactories = recoveryStateFactories;
         // doClose() is called when shutting down a node, yet there might still be ongoing requests
         // that we need to wait for before closing some resources such as the caches. In order to
         // avoid closing these resources while ongoing requests are still being processed, we use a
@@ -639,7 +642,7 @@ public class IndicesService extends AbstractLifecycleComponent
             indexCreationContext);
 
         final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
-                directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
+                directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);
         for (IndexingOperationListener operationListener : indexingOperationListeners) {
             indexModule.addIndexOperationListener(operationListener);
         }
@@ -710,7 +713,7 @@ public class IndicesService extends AbstractLifecycleComponent
     public synchronized MapperService createIndexMapperService(IndexMetadata indexMetadata) throws IOException {
         final IndexSettings idxSettings = new IndexSettings(indexMetadata, this.settings, indexScopedSettings);
         final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
-                directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
+                directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);
         pluginsService.onIndexModule(indexModule);
         return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
     }
@@ -745,16 +748,19 @@ public class IndicesService extends AbstractLifecycleComponent
     @Override
     public IndexShard createShard(
             final ShardRouting shardRouting,
-            final RecoveryState recoveryState,
             final PeerRecoveryTargetService recoveryTargetService,
             final PeerRecoveryTargetService.RecoveryListener recoveryListener,
             final RepositoriesService repositoriesService,
             final Consumer<IndexShard.ShardFailure> onShardFailure,
             final Consumer<ShardId> globalCheckpointSyncer,
-            final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
+            final RetentionLeaseSyncer retentionLeaseSyncer,
+            final DiscoveryNode targetNode,
+            final DiscoveryNode sourceNode) throws IOException {
         Objects.requireNonNull(retentionLeaseSyncer);
         ensureChangesAllowed();
         IndexService indexService = indexService(shardRouting.index());
+        assert indexService != null;
+        RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
         IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
         indexShard.addShardFailureCallback(onShardFailure);
         indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,

+ 8 - 6
server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -604,16 +604,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
         try {
             final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());
             logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
-            RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
             indicesService.createShard(
                     shardRouting,
-                    recoveryState,
                     recoveryTargetService,
                     new RecoveryListener(shardRouting, primaryTerm),
                     repositoriesService,
                     failedShardHandler,
                     this::updateGlobalCheckpointForShard,
-                    retentionLeaseSyncer);
+                    retentionLeaseSyncer,
+                    nodes.getLocalNode(),
+                    sourceNode);
         } catch (Exception e) {
             failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
         }
@@ -903,25 +903,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
          * Creates a shard for the specified shard routing and starts recovery.
          *
          * @param shardRouting           the shard routing
-         * @param recoveryState          the recovery state
          * @param recoveryTargetService  recovery service for the target
          * @param recoveryListener       a callback when recovery changes state (finishes or fails)
          * @param repositoriesService    service responsible for snapshot/restore
          * @param onShardFailure         a callback when this shard fails
          * @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint
          * @param retentionLeaseSyncer   a callback when this shard syncs retention leases
+         * @param targetNode             the node where this shard will be recovered
+         * @param sourceNode             the source node to recover this shard from (it might be null)
          * @return a new shard
          * @throws IOException if an I/O exception occurs when creating the shard
          */
         T createShard(
                 ShardRouting shardRouting,
-                RecoveryState recoveryState,
                 PeerRecoveryTargetService recoveryTargetService,
                 PeerRecoveryTargetService.RecoveryListener recoveryListener,
                 RepositoriesService repositoriesService,
                 Consumer<IndexShard.ShardFailure> onShardFailure,
                 Consumer<ShardId> globalCheckpointSyncer,
-                RetentionLeaseSyncer retentionLeaseSyncer) throws IOException;
+                RetentionLeaseSyncer retentionLeaseSyncer,
+                DiscoveryNode targetNode,
+                @Nullable DiscoveryNode sourceNode) throws IOException;
 
         /**
          * Returns shard for the specified id if it exists otherwise returns <code>null</code>.

+ 8 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -451,6 +451,13 @@ public class Node implements Closeable {
                             .flatMap(m -> m.entrySet().stream())
                             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
+            final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories =
+                pluginsService.filterPlugins(IndexStorePlugin.class)
+                    .stream()
+                    .map(IndexStorePlugin::getRecoveryStateFactories)
+                    .flatMap(m -> m.entrySet().stream())
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
             final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = pluginsService
                 .filterPlugins(SystemIndexPlugin.class)
                 .stream()
@@ -468,7 +475,7 @@ public class Node implements Closeable {
                     clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
                     threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
                     clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
-                    searchModule.getValuesSourceRegistry());
+                    searchModule.getValuesSourceRegistry(), recoveryStateFactories);
 
             final AliasValidator aliasValidator = new AliasValidator();
 

+ 27 - 0
server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java

@@ -20,10 +20,15 @@
 package org.elasticsearch.plugins;
 
 import org.apache.lucene.store.Directory;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.ShardPath;
+import org.elasticsearch.indices.recovery.RecoveryState;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -55,4 +60,26 @@ public interface IndexStorePlugin {
      */
     Map<String, DirectoryFactory> getDirectoryFactories();
 
+    /**
+     * An interface that allows to create a new {@link RecoveryState} per shard.
+     */
+    @FunctionalInterface
+    interface RecoveryStateFactory {
+        /**
+         * Creates a new {@link RecoveryState} per shard. This method is called once per shard on shard creation.
+         * @return a new RecoveryState instance
+         */
+        RecoveryState newRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode);
+    }
+
+    /**
+     * The {@link RecoveryStateFactory} mappings for this plugin. When an index is created the recovery type setting
+     * {@link org.elasticsearch.index.IndexModule#INDEX_RECOVERY_TYPE_SETTING} on the index will be examined and either use the default
+     * or looked up among all the recovery state factories from {@link IndexStorePlugin} plugins.
+     *
+     * @return a map from recovery type to an recovery state factory
+     */
+    default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
+        return Collections.emptyMap();
+    }
 }

+ 54 - 3
server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -35,6 +35,10 @@ import org.apache.lucene.util.SetOnce.AlreadySetException;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedFunction;
 import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -77,6 +81,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.mapper.MapperRegistry;
+import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.plugins.IndexStorePlugin;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.internal.SearchContext;
@@ -104,6 +109,8 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
 
 public class IndexModuleTests extends ESTestCase {
     private Index index;
@@ -168,7 +175,13 @@ public class IndexModuleTests extends ESTestCase {
     public void testWrapperIsBound() throws IOException {
         final MockEngineFactory engineFactory = new MockEngineFactory(AssertingDirectoryReader.class);
         IndexModule module = new IndexModule(
-                indexSettings, emptyAnalysisRegistry, engineFactory, Collections.emptyMap(), () -> true, new IndexNameExpressionResolver());
+                indexSettings,
+                emptyAnalysisRegistry,
+                engineFactory,
+                Collections.emptyMap(),
+                () -> true,
+                new IndexNameExpressionResolver(),
+                Collections.emptyMap());
         module.setReaderWrapper(s -> new Wrapper());
 
         IndexService indexService = newIndexService(module);
@@ -189,7 +202,7 @@ public class IndexModuleTests extends ESTestCase {
         final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = singletonMap(
             "foo_store", new FooFunction());
         final IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories,
-            () -> true, new IndexNameExpressionResolver());
+            () -> true, new IndexNameExpressionResolver(), Collections.emptyMap());
 
         final IndexService indexService = newIndexService(module);
         assertThat(indexService.getDirectoryFactory(), instanceOf(FooFunction.class));
@@ -485,9 +498,47 @@ public class IndexModuleTests extends ESTestCase {
         assertThat(e, hasToString(containsString("store type [" + storeType + "] is not allowed")));
     }
 
+    public void testRegisterCustomRecoveryStateFactory() throws IOException {
+        final Settings settings = Settings
+            .builder()
+            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
+            .put(IndexModule.INDEX_RECOVERY_TYPE_SETTING.getKey(), "test_recovery")
+            .build();
+
+        final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
+
+        RecoveryState recoveryState = mock(RecoveryState.class);
+        final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = singletonMap(
+            "test_recovery", (shardRouting, targetNode, sourceNode) -> recoveryState);
+
+        final IndexModule module = new IndexModule(indexSettings,
+            emptyAnalysisRegistry,
+            new InternalEngineFactory(),
+            Collections.emptyMap(),
+            () -> true,
+            new IndexNameExpressionResolver(),
+            recoveryStateFactories);
+
+        final IndexService indexService = newIndexService(module);
+
+        ShardRouting shard = createInitializedShardRouting();
+
+        assertThat(indexService.createRecoveryState(shard, mock(DiscoveryNode.class), mock(DiscoveryNode.class)), is(recoveryState));
+
+        indexService.close("closing", false);
+    }
+
+    private ShardRouting createInitializedShardRouting() {
+        ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true,
+            RecoverySource.ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
+        shard = shard.initialize("node1", null, -1);
+        return shard;
+    }
+
     private static IndexModule createIndexModule(IndexSettings indexSettings, AnalysisRegistry emptyAnalysisRegistry) {
         return new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap(), () -> true,
-            new IndexNameExpressionResolver());
+            new IndexNameExpressionResolver(), Collections.emptyMap());
     }
 
     class CustomQueryCache implements QueryCache {

+ 5 - 2
server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

@@ -22,6 +22,7 @@ package org.elasticsearch.indices.cluster;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -230,14 +231,16 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
         @Override
         public MockIndexShard createShard(
                 final ShardRouting shardRouting,
-                final RecoveryState recoveryState,
                 final PeerRecoveryTargetService recoveryTargetService,
                 final PeerRecoveryTargetService.RecoveryListener recoveryListener,
                 final RepositoriesService repositoriesService,
                 final Consumer<IndexShard.ShardFailure> onShardFailure,
                 final Consumer<ShardId> globalCheckpointSyncer,
-                final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
+                final RetentionLeaseSyncer retentionLeaseSyncer,
+                final DiscoveryNode targetNode,
+                final DiscoveryNode sourceNode) throws IOException {
             failRandomly();
+            RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode);
             MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
             MockIndexShard indexShard = indexService.createShard(shardRouting);
             indexShard.recoveryState = recoveryState;

+ 48 - 0
server/src/test/java/org/elasticsearch/plugins/IndexStorePluginTests.java

@@ -20,9 +20,12 @@
 package org.elasticsearch.plugins;
 
 import org.elasticsearch.bootstrap.JavaVersion;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.store.FsDirectoryFactory;
+import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.node.MockNode;
 import org.elasticsearch.test.ESTestCase;
 
@@ -69,6 +72,38 @@ public class IndexStorePluginTests extends ESTestCase {
 
     }
 
+    public static class FooCustomRecoveryStore extends Plugin implements IndexStorePlugin {
+        @Override
+        public Map<String, DirectoryFactory> getDirectoryFactories() {
+            return Collections.singletonMap("store-a", new FsDirectoryFactory());
+        }
+
+        @Override
+        public Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
+            return Collections.singletonMap("recovery-type", new RecoveryFactory());
+        }
+    }
+
+    public static class BarCustomRecoveryStore extends Plugin implements IndexStorePlugin {
+        @Override
+        public Map<String, DirectoryFactory> getDirectoryFactories() {
+            return Collections.singletonMap("store-b", new FsDirectoryFactory());
+        }
+
+        @Override
+        public Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
+            return Collections.singletonMap("recovery-type", new RecoveryFactory());
+        }
+    }
+
+    public static class RecoveryFactory implements IndexStorePlugin.RecoveryStateFactory {
+        @Override
+        public RecoveryState newRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, DiscoveryNode sourceNode) {
+            return new RecoveryState(shardRouting, targetNode, sourceNode);
+        }
+    }
+
+
     public void testIndexStoreFactoryConflictsWithBuiltInIndexStoreType() {
         final Settings settings = Settings.builder().put("path.home", createTempDir()).build();
         final IllegalStateException e = expectThrows(
@@ -92,4 +127,17 @@ public class IndexStorePluginTests extends ESTestCase {
         }
     }
 
+    public void testDuplicateIndexStoreRecoveryStateFactories() {
+        final Settings settings = Settings.builder().put("path.home", createTempDir()).build();
+        final IllegalStateException e = expectThrows(
+            IllegalStateException.class, () -> new MockNode(settings, Arrays.asList(FooCustomRecoveryStore.class,
+                                                                                    BarCustomRecoveryStore.class)));
+        if (JavaVersion.current().compareTo(JavaVersion.parse("9")) >= 0) {
+            assertThat(e.getMessage(), containsString("Duplicate key recovery-type"));
+        } else {
+            assertThat(e, hasToString(matches(
+                "java.lang.IllegalStateException: Duplicate key " +
+                    "org.elasticsearch.plugins.IndexStorePluginTests$RecoveryFactory@[\\w\\d]+")));
+        }
+    }
 }

+ 2 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1434,7 +1434,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     new MetaStateService(nodeEnv, namedXContentRegistry),
                     Collections.emptyList(),
                     emptyMap(),
-                    null
+                    null,
+                    emptyMap()
                 );
                 final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
                 snapshotShardsService =

+ 1 - 1
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java

@@ -76,7 +76,7 @@ public class WatcherPluginTests extends ESTestCase {
         AnalysisRegistry registry = new AnalysisRegistry(TestEnvironment.newEnvironment(settings), emptyMap(), emptyMap(), emptyMap(),
                 emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap());
         IndexModule indexModule = new IndexModule(indexSettings, registry, new InternalEngineFactory(), Collections.emptyMap(),
-                () -> true, new IndexNameExpressionResolver());
+                () -> true, new IndexNameExpressionResolver(), Collections.emptyMap());
         // this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it
         watcher.onIndexModule(indexModule);