Browse Source

Switch to shared thread pool for all snapshot repositories

 Closes #6181
Igor Motov 11 years ago
parent
commit
c20713530d

+ 0 - 3
docs/reference/modules/snapshots.asciidoc

@@ -67,7 +67,6 @@ on all data and master nodes. The following settings are supported:
 [horizontal]
 `location`:: Location of the snapshots. Mandatory.
 `compress`:: Turns on compression of the snapshot files. Defaults to `true`.
-`concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`
 `chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by
  using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size).
 `max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `20mb` per second.
@@ -83,8 +82,6 @@ point to the root of the shared filesystem repository. The following settings ar
 
 [horizontal]
 `url`:: Location of the snapshots. Mandatory.
-`concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`
-
 
 [float]
 ===== Repository plugins

+ 12 - 2
docs/reference/modules/threadpool.asciidoc

@@ -36,13 +36,23 @@ pools, but the important ones include:
     size `# of available processors`.
     queue_size `1000`.
 
-`warmer`:: 
+`snapshot`::
+    For snapshot/restore operations, defaults to `scaling`
+    keep-alive `5m`,
+    size `(# of available processors)/2`.
+
+`snapshot_data`::
+    For snapshot/restore operations on data files, defaults to `scaling`
+    with a `5m` keep-alive,
+    size `5`.
+
+`warmer`::
     For segment warm-up operations, defaults to `scaling`
     with a `5m` keep-alive. 
 
 `refresh`:: 
     For refresh operations, defaults to `scaling`
-    with a `5m` keep-alive. 
+    with a `5m` keep-alive.
 
 Changing a specific thread pool can be done by setting its type and
 specific type parameters, for example, changing the `index` thread pool

+ 5 - 4
src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java

@@ -28,6 +28,7 @@ import org.elasticsearch.common.io.FileSystemUtils;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.File;
 import java.util.concurrent.Executor;
@@ -37,15 +38,16 @@ import java.util.concurrent.Executor;
  */
 public class FsBlobStore extends AbstractComponent implements BlobStore {
 
-    private final Executor executor;
+    private final ThreadPool threadPool;
 
     private final File path;
 
     private final int bufferSizeInBytes;
 
-    public FsBlobStore(Settings settings, Executor executor, File path) {
+    public FsBlobStore(Settings settings, ThreadPool threadPool, File path) {
         super(settings);
         this.path = path;
+        this.threadPool = threadPool;
         if (!path.exists()) {
             boolean b = FileSystemUtils.mkdirs(path);
             if (!b) {
@@ -56,7 +58,6 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
             throw new BlobStoreException("Path is not a directory at [" + path + "]");
         }
         this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
-        this.executor = executor;
     }
 
     @Override
@@ -73,7 +74,7 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
     }
 
     public Executor executor() {
-        return executor;
+        return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
     }
 
     @Override

+ 6 - 5
src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -37,7 +38,7 @@ import java.util.concurrent.Executor;
  */
 public class URLBlobStore extends AbstractComponent implements BlobStore {
 
-    private final Executor executor;
+    private final ThreadPool threadPool;
 
     private final URL path;
 
@@ -53,14 +54,14 @@ public class URLBlobStore extends AbstractComponent implements BlobStore {
      * </dl>
      *
      * @param settings settings
-     * @param executor executor for read operations
+     * @param threadPool thread pool for read operations
      * @param path     base URL
      */
-    public URLBlobStore(Settings settings, Executor executor, URL path) {
+    public URLBlobStore(Settings settings, ThreadPool threadPool, URL path) {
         super(settings);
         this.path = path;
         this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
-        this.executor = executor;
+        this.threadPool = threadPool;
     }
 
     /**
@@ -95,7 +96,7 @@ public class URLBlobStore extends AbstractComponent implements BlobStore {
      * @return executor
      */
     public Executor executor() {
-        return executor;
+        return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
     }
 
     /**

+ 2 - 2
src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java

@@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.gateway.IndexShardGateway;
 import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
-import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.index.settings.IndexSettings;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.IndexShardState;
@@ -41,6 +40,7 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogStreams;
 import org.elasticsearch.index.translog.fs.FsTranslog;
+import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -274,7 +274,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
                 return;
             }
             if (indexShard.state() == IndexShardState.STARTED && indexShard.translog().syncNeeded()) {
-                threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
+                threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
                     @Override
                     public void run() {
                         try {

+ 5 - 0
src/main/java/org/elasticsearch/node/internal/InternalNode.java

@@ -89,6 +89,7 @@ import org.elasticsearch.script.ScriptModule;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.SearchService;
+import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPoolModule;
 import org.elasticsearch.transport.TransportModule;
@@ -223,6 +224,7 @@ public final class InternalNode implements Node {
         injector.getInstance(IndicesClusterStateService.class).start();
         injector.getInstance(IndicesTTLService.class).start();
         injector.getInstance(RiversManager.class).start();
+        injector.getInstance(SnapshotsService.class).start();
         injector.getInstance(ClusterService.class).start();
         injector.getInstance(RoutingService.class).start();
         injector.getInstance(SearchService.class).start();
@@ -263,6 +265,7 @@ public final class InternalNode implements Node {
 
         injector.getInstance(RiversManager.class).stop();
 
+        injector.getInstance(SnapshotsService.class).stop();
         // stop any changes happening as a result of cluster state changes
         injector.getInstance(IndicesClusterStateService.class).stop();
         // we close indices first, so operations won't be allowed on it
@@ -317,6 +320,8 @@ public final class InternalNode implements Node {
         stopWatch.stop().start("rivers");
         injector.getInstance(RiversManager.class).close();
 
+        stopWatch.stop().start("snapshot_service");
+        injector.getInstance(SnapshotsService.class).close();
         stopWatch.stop().start("client");
         injector.getInstance(Client.class).close();
         stopWatch.stop().start("indices_cluster");

+ 3 - 7
src/main/java/org/elasticsearch/repositories/fs/FsRepository.java

@@ -24,17 +24,15 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.fs.FsBlobStore;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.snapshots.IndexShardRepository;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryName;
 import org.elasticsearch.repositories.RepositorySettings;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Shared file system implementation of the BlobStoreRepository
@@ -68,7 +66,7 @@ public class FsRepository extends BlobStoreRepository {
      * @throws IOException
      */
     @Inject
-    public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
+    public FsRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException {
         super(name.getName(), repositorySettings, indexShardRepository);
         File locationFile;
         String location = repositorySettings.settings().get("location", componentSettings.get("location"));
@@ -78,9 +76,7 @@ public class FsRepository extends BlobStoreRepository {
         } else {
             locationFile = new File(location);
         }
-        int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
-        ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
-        blobStore = new FsBlobStore(componentSettings, concurrentStreamPool, locationFile);
+        blobStore = new FsBlobStore(componentSettings, threadPool, locationFile);
         this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null));
         this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
         this.basePath = BlobPath.cleanPath();

+ 3 - 7
src/main/java/org/elasticsearch/repositories/uri/URLRepository.java

@@ -25,17 +25,15 @@ import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.url.URLBlobStore;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.snapshots.IndexShardRepository;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryName;
 import org.elasticsearch.repositories.RepositorySettings;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Read-only URL-based implementation of the BlobStoreRepository
@@ -65,7 +63,7 @@ public class URLRepository extends BlobStoreRepository {
      * @throws IOException
      */
     @Inject
-    public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
+    public URLRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException {
         super(name.getName(), repositorySettings, indexShardRepository);
         URL url;
         String path = repositorySettings.settings().get("url", componentSettings.get("url"));
@@ -74,10 +72,8 @@ public class URLRepository extends BlobStoreRepository {
         } else {
             url = new URL(path);
         }
-        int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
-        ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
         listDirectories = repositorySettings.settings().getAsBoolean("list_directories", componentSettings.getAsBoolean("list_directories", true));
-        blobStore = new URLBlobStore(componentSettings, concurrentStreamPool, url);
+        blobStore = new URLBlobStore(componentSettings, threadPool, url);
         basePath = BlobPath.cleanPath();
     }
 

+ 2 - 0
src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java

@@ -65,6 +65,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
             ThreadPool.Names.REFRESH,
             ThreadPool.Names.SEARCH,
             ThreadPool.Names.SNAPSHOT,
+            ThreadPool.Names.SNAPSHOT_DATA,
             ThreadPool.Names.SUGGEST,
             ThreadPool.Names.WARMER
     };
@@ -82,6 +83,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
             "r",
             "s",
             "sn",
+            "sd",
             "su",
             "w"
     };

+ 44 - 3
src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -35,7 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -56,6 +56,10 @@ import org.elasticsearch.transport.*;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Maps.newHashMap;
@@ -79,7 +83,7 @@ import static com.google.common.collect.Sets.newHashSet;
  * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId, SnapshotInfo, Throwable)} to remove snapshot from cluster state</li>
  * </ul>
  */
-public class SnapshotsService extends AbstractComponent implements ClusterStateListener {
+public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsService> implements ClusterStateListener {
 
     private final ClusterService clusterService;
 
@@ -93,6 +97,10 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
 
     private volatile ImmutableMap<SnapshotId, SnapshotShards> shardSnapshots = ImmutableMap.of();
 
+    private final Lock shutdownLock = new ReentrantLock();
+
+    private final Condition shutdownCondition = shutdownLock.newCondition();
+
     private final CopyOnWriteArrayList<SnapshotCompletionListener> snapshotCompletionListeners = new CopyOnWriteArrayList<>();
 
 
@@ -678,7 +686,16 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
 
         // Update the list of snapshots that we saw and tried to started
         // If startup of these shards fails later, we don't want to try starting these shards again
-        shardSnapshots = ImmutableMap.copyOf(survivors);
+        shutdownLock.lock();
+        try {
+            shardSnapshots = ImmutableMap.copyOf(survivors);
+            if (shardSnapshots.isEmpty()) {
+                // Notify all waiting threads that no more snapshots
+                shutdownCondition.signalAll();
+            }
+        } finally {
+            shutdownLock.unlock();
+        }
 
         // We have new snapshots to process -
         if (newSnapshots != null) {
@@ -1101,6 +1118,30 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
         this.snapshotCompletionListeners.remove(listener);
     }
 
+    @Override
+    protected void doStart() throws ElasticsearchException {
+
+    }
+
+    @Override
+    protected void doStop() throws ElasticsearchException {
+        shutdownLock.lock();
+        try {
+            while(!shardSnapshots.isEmpty() && shutdownCondition.await(5, TimeUnit.SECONDS)) {
+                // Wait for at most 5 second for locally running snapshots to finish
+            }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        } finally {
+            shutdownLock.unlock();
+        }
+    }
+
+    @Override
+    protected void doClose() throws ElasticsearchException {
+
+    }
+
     /**
      * Listener for create snapshot operation
      */

+ 2 - 0
src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -74,6 +74,7 @@ public class ThreadPool extends AbstractComponent {
         public static final String REFRESH = "refresh";
         public static final String WARMER = "warmer";
         public static final String SNAPSHOT = "snapshot";
+        public static final String SNAPSHOT_DATA = "snapshot_data";
         public static final String OPTIMIZE = "optimize";
         public static final String BENCH = "bench";
     }
@@ -117,6 +118,7 @@ public class ThreadPool extends AbstractComponent {
                 .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build())
                 .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
                 .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
+                .put(Names.SNAPSHOT_DATA, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build())
                 .put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build())
                 .put(Names.BENCH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
                 .build();

+ 3 - 2
src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -30,6 +30,7 @@ import org.elasticsearch.index.snapshots.IndexShardRepository;
 import org.elasticsearch.repositories.RepositoryName;
 import org.elasticsearch.repositories.RepositorySettings;
 import org.elasticsearch.repositories.fs.FsRepository;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -71,8 +72,8 @@ public class MockRepository extends FsRepository {
     private volatile boolean blocked = false;
 
     @Inject
-    public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
-        super(name, repositorySettings, indexShardRepository);
+    public MockRepository(RepositoryName name, ThreadPool threadPool, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
+        super(name, repositorySettings, threadPool, indexShardRepository);
         randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0);
         randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
         blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false);

+ 1 - 1
src/test/java/org/elasticsearch/test/TestCluster.java

@@ -294,7 +294,7 @@ public final class TestCluster extends ImmutableTestCluster {
             for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET,
                     ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE,
                     ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT,
-                    ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
+                    ThreadPool.Names.SNAPSHOT_DATA, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
                 if (random.nextBoolean()) {
                     final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling"));
                     builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);