Bladeren bron

Reindex data stream indices on different nodes (#125171) (#125333)

(cherry picked from commit 24132d3834cbadee6597a31494493b06532dc21f)
Keith Massey 6 maanden geleden
bovenliggende
commit
e946f246a4

+ 5 - 0
docs/changelog/125171.yaml

@@ -0,0 +1,5 @@
+pr: 125171
+summary: Reindex data stream indices on different nodes
+area: Data streams
+type: enhancement
+issues: []

+ 37 - 1
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

@@ -12,6 +12,7 @@ import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
 import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
@@ -35,9 +36,12 @@ import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.broadcast.BroadcastResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Assertions;
@@ -55,6 +59,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
 import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
@@ -64,6 +69,7 @@ import java.util.Arrays;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
@@ -105,6 +111,14 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
 
     private final ClusterService clusterService;
     private final Client client;
+    private final TransportService transportService;
+    /*
+     * The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests
+     * to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that
+     * the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and
+     * Integer.MAX_VALUE + 1).
+     */
+    private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30));
 
     @Inject
     public ReindexDataStreamIndexTransportAction(
@@ -123,6 +137,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
         );
         this.clusterService = clusterService;
         this.client = client;
+        this.transportService = transportService;
     }
 
     @Override
@@ -326,7 +341,28 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
                 listener.onResponse(bulkByScrollResponse);
             }
         }, listener::onFailure);
-        client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener);
+        /*
+         * Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that
+         * becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a
+         * data stream has many indices, this can improve performance a good bit.
+         */
+        final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new);
+        if (ingestNodes.length == 0) {
+            listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster"));
+        } else {
+            DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
+            logger.debug("Sending reindex request to {}", ingestNode.getName());
+            transportService.sendRequest(
+                ingestNode,
+                ReindexAction.NAME,
+                reindexRequest,
+                new ActionListenerResponseHandler<>(
+                    checkForFailuresListener,
+                    BulkByScrollResponse::new,
+                    TransportResponseHandler.TRANSPORT_WORKER
+                )
+            );
+        }
     }
 
     private void updateSettings(

+ 156 - 2
x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java

@@ -10,6 +10,10 @@ package org.elasticsearch.xpack.migrate.action;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -29,10 +33,16 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
@@ -112,7 +122,10 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
             )
         );
 
-        doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
+        ClusterState clusterState = mock(ClusterState.class);
+        when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
+        when(clusterService.state()).thenReturn(clusterState);
+        doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());
 
         action.reindex(sourceIndex, destIndex, listener, taskId);
 
@@ -137,7 +150,10 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
                 Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
             )
         );
-        doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
+        ClusterState clusterState = mock(ClusterState.class);
+        when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
+        when(clusterService.state()).thenReturn(clusterState);
+        doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());
 
         action.reindex(sourceIndex, destIndex, listener, taskId);
 
@@ -204,4 +220,142 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
             e.getMessage()
         );
     }
+
+    public void testRoundRobin() {
+        /*
+         * This tests that the action will round-robin through the list of ingest nodes in the cluster.
+         */
+        String sourceIndex = randomAlphanumericOfLength(10);
+        String destIndex = randomAlphanumericOfLength(10);
+        AtomicBoolean failed = new AtomicBoolean(false);
+        ActionListener<BulkByScrollResponse> listener = new ActionListener<>() {
+            @Override
+            public void onResponse(BulkByScrollResponse bulkByScrollResponse) {}
+
+            @Override
+            public void onFailure(Exception e) {
+                failed.set(true);
+            }
+        };
+        TaskId taskId = TaskId.EMPTY_TASK_ID;
+
+        when(clusterService.getClusterSettings()).thenReturn(
+            new ClusterSettings(
+                Settings.EMPTY,
+                Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
+            )
+        );
+
+        ClusterState clusterState = mock(ClusterState.class);
+        when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
+        when(clusterService.state()).thenReturn(clusterState);
+        ArgumentCaptor<DiscoveryNode> nodeCaptor = ArgumentCaptor.captor();
+        doNothing().when(transportService).sendRequest(nodeCaptor.capture(), eq(ReindexAction.NAME), request.capture(), any());
+
+        action.reindex(sourceIndex, destIndex, listener, taskId);
+        DiscoveryNode node1 = nodeCaptor.getValue();
+        assertNotNull(node1);
+
+        action.reindex(sourceIndex, destIndex, listener, taskId);
+        DiscoveryNode node2 = nodeCaptor.getValue();
+        assertNotNull(node2);
+
+        int ingestNodeCount = clusterState.getNodes().getIngestNodes().size();
+        if (ingestNodeCount > 1) {
+            assertThat(node1.getName(), not(equalTo(node2.getName())));
+        }
+
+        // check that if we keep going we eventually get back to the original node:
+        DiscoveryNode node = node2;
+        for (int i = 0; i < ingestNodeCount - 1; i++) {
+            action.reindex(sourceIndex, destIndex, listener, taskId);
+            node = nodeCaptor.getValue();
+        }
+        assertNotNull(node);
+        assertThat(node1.getName(), equalTo(node.getName()));
+        assertThat(failed.get(), equalTo(false));
+
+        // make sure the listener gets notified of failure if there are no ingest nodes:
+        when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodesNoIngest());
+        action.reindex(sourceIndex, destIndex, listener, taskId);
+        assertThat(failed.get(), equalTo(true));
+    }
+
+    private DiscoveryNodes getTestDiscoveryNodes() {
+        DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
+        boolean nodeHasIngestRole = false;
+        int nodeCount = randomIntBetween(1, 10);
+        for (int i = 0; i < nodeCount; i++) {
+            final DiscoveryNode discoveryNode = new DiscoveryNode(
+                "test-name-" + i,
+                "test-id-" + i,
+                "test-ephemeral-id-" + i,
+                "test-hostname-" + i,
+                "test-hostaddr",
+                buildNewFakeTransportAddress(),
+                Map.of(),
+                randomSet(
+                    1,
+                    5,
+                    () -> randomFrom(
+                        DiscoveryNodeRole.DATA_ROLE,
+                        DiscoveryNodeRole.INGEST_ROLE,
+                        DiscoveryNodeRole.SEARCH_ROLE,
+                        DiscoveryNodeRole.MASTER_ROLE,
+                        DiscoveryNodeRole.MASTER_ROLE
+                    )
+                ),
+                null,
+                null
+            );
+            nodeHasIngestRole = nodeHasIngestRole || discoveryNode.getRoles().contains(DiscoveryNodeRole.INGEST_ROLE);
+            builder.add(discoveryNode);
+        }
+        if (nodeHasIngestRole == false) {
+            final DiscoveryNode discoveryNode = new DiscoveryNode(
+                "test-name-" + nodeCount,
+                "test-id-" + nodeCount,
+                "test-ephemeral-id-" + nodeCount,
+                "test-hostname-" + nodeCount,
+                "test-hostaddr",
+                buildNewFakeTransportAddress(),
+                Map.of(),
+                Set.of(DiscoveryNodeRole.INGEST_ROLE),
+                null,
+                null
+            );
+            builder.add(discoveryNode);
+        }
+        return builder.build();
+    }
+
+    private DiscoveryNodes getTestDiscoveryNodesNoIngest() {
+        DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
+        int nodeCount = randomIntBetween(0, 10);
+        for (int i = 0; i < nodeCount; i++) {
+            final DiscoveryNode discoveryNode = new DiscoveryNode(
+                "test-name-" + i,
+                "test-id-" + i,
+                "test-ephemeral-id-" + i,
+                "test-hostname-" + i,
+                "test-hostaddr",
+                buildNewFakeTransportAddress(),
+                Map.of(),
+                randomSet(
+                    1,
+                    4,
+                    () -> randomFrom(
+                        DiscoveryNodeRole.DATA_ROLE,
+                        DiscoveryNodeRole.SEARCH_ROLE,
+                        DiscoveryNodeRole.MASTER_ROLE,
+                        DiscoveryNodeRole.MASTER_ROLE
+                    )
+                ),
+                null,
+                null
+            );
+            builder.add(discoveryNode);
+        }
+        return builder.build();
+    }
 }