Browse Source

Fix TransportSnapshotsStatusAction ThreadPool Use (#45824)

In case of an in-progress snapshot this endpoint was broken because
it tried to execute repository operations in the callback on a
transport thread which is not allowed (only generic or snapshot
pool are allowed here).
Armin Braun 6 years ago
parent
commit
6bc38ab186

+ 8 - 9
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

@@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.cluster.snapshots.status;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.client.node.NodeClient;
@@ -116,15 +117,13 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
             for (int i = 0; i < currentSnapshots.size(); i++) {
                 snapshots[i] = currentSnapshots.get(i).snapshot();
             }
-
-            TransportNodesSnapshotsStatus.Request nodesRequest =
-                new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
-                    .snapshots(snapshots).timeout(request.masterNodeTimeout());
-            client.executeLocally(TransportNodesSnapshotsStatus.TYPE, nodesRequest,
-                ActionListener.map(
-                    listener, nodeSnapshotStatuses ->
-                        buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
-                            nodeSnapshotStatuses)));
+            client.executeLocally(TransportNodesSnapshotsStatus.TYPE,
+                new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY))
+                    .snapshots(snapshots).timeout(request.masterNodeTimeout()),
+                ActionListener.wrap(
+                    nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute(
+                        ActionRunnable.wrap(listener, l -> l.onResponse(buildResponse(request, snapshotsService.currentSnapshots(
+                            request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses)))), listener::onFailure));
         } else {
             // We don't have any in-progress shards, just return current stats
             listener.onResponse(buildResponse(request, currentSnapshots, null));

+ 37 - 0
server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

@@ -19,11 +19,14 @@
 package org.elasticsearch.snapshots;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 
 import java.util.List;
 
@@ -72,4 +75,38 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
         assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
         assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
     }
+
+    public void testStatusAPICallInProgressSnapshot() throws InterruptedException {
+        Client client = client();
+
+        logger.info("-->  creating repository");
+        assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("mock").setSettings(
+            Settings.builder().put("location", randomRepoPath()).put("block_on_data", true)));
+
+        createIndex("test-idx-1");
+        ensureGreen();
+
+        logger.info("--> indexing some data");
+        for (int i = 0; i < 100; i++) {
+            index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i);
+        }
+        refresh();
+
+        logger.info("--> snapshot");
+        ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture =
+            client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
+
+        logger.info("--> wait for data nodes to get blocked");
+        waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
+
+        final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
+            new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
+        assertEquals(snapshotStatus.get(0).getState(), SnapshotsInProgress.State.STARTED);
+
+        logger.info("--> unblock all data nodes");
+        unblockAllDataNodes("test-repo");
+
+        logger.info("--> wait for snapshot to finish");
+        createSnapshotResponseActionFuture.actionGet();
+    }
 }