Browse Source

Improve short-circuiting downsample execution (#106563)

This is relevant in the case multiple downsample api invocations have been executed for the same source index, target index and  fixed interval. Whether the target index is ready, is now also checked just before starting the downsample persistent tasks.

Relates to #106403
Martijn van Groningen 1 year ago
parent
commit
8a7697bdc9

+ 5 - 0
docs/changelog/106563.yaml

@@ -0,0 +1,5 @@
+pr: 106563
+summary: Improve short-circuiting downsample execution
+area: TSDB
+type: enhancement
+issues: []

+ 50 - 26
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

@@ -234,33 +234,11 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
         }
 
         final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId());
-        // Shortcircuit if target index has been downsampled:
+        // Short circuit if target index has been downsampled:
         final String downsampleIndexName = request.getTargetIndex();
-        IndexMetadata downsampleIndex = state.getMetadata().index(downsampleIndexName);
-        if (downsampleIndex != null) {
-            var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings());
-            if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) {
-                // This isn't a downsample index, so fail:
-                listener.onFailure(new ResourceAlreadyExistsException(downsampleIndex.getIndex()));
-                return;
-            } else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) {
-                listener.onResponse(AcknowledgedResponse.TRUE);
-                return;
-            }
-            // In case the write block has been set on the target index means that the shard level downsampling itself was successful,
-            // but the previous invocation failed later performing settings update, refresh or force merge.
-            // The write block is used a signal to resume from the refresh part of the downsample api invocation.
-            if (downsampleIndex.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) {
-                var refreshRequest = new RefreshRequest(downsampleIndexName);
-                refreshRequest.setParentTask(parentTask);
-                client.admin()
-                    .indices()
-                    .refresh(
-                        refreshRequest,
-                        new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout())
-                    );
-                return;
-            }
+        if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), state.metadata(), listener)) {
+            logger.info("Skipping downsampling, because a previous execution already completed downsampling");
+            return;
         }
         try {
             MetadataCreateIndexService.validateIndexName(downsampleIndexName, state);
@@ -356,6 +334,11 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
                     }
                 }, e -> {
                     if (e instanceof ResourceAlreadyExistsException) {
+                        var metadata = clusterService.state().metadata();
+                        if (canShortCircuit(request.getTargetIndex(), parentTask, request.getWaitTimeout(), metadata, listener)) {
+                            logger.info("Downsample tasks are not created, because a previous execution already completed downsampling");
+                            return;
+                        }
                         performShardDownsampling(
                             request,
                             delegate,
@@ -374,6 +357,47 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
         }));
     }
 
+    /**
+     * Shortcircuit when another downsample api invocation already completed successfully.
+     */
+    private boolean canShortCircuit(
+        String targetIndexName,
+        TaskId parentTask,
+        TimeValue waitTimeout,
+        Metadata metadata,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        IndexMetadata targetIndexMetadata = metadata.index(targetIndexName);
+        if (targetIndexMetadata == null) {
+            return false;
+        }
+
+        var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(targetIndexMetadata.getSettings());
+        if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) {
+            // This isn't a downsample index, so fail:
+            listener.onFailure(new ResourceAlreadyExistsException(targetIndexMetadata.getIndex()));
+            return true;
+        } else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) {
+            listener.onResponse(AcknowledgedResponse.TRUE);
+            return true;
+        }
+        // In case the write block has been set on the target index means that the shard level downsampling itself was successful,
+        // but the previous invocation failed later performing settings update, refresh or force merge.
+        // The write block is used a signal to resume from the refresh part of the downsample api invocation.
+        if (targetIndexMetadata.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) {
+            var refreshRequest = new RefreshRequest(targetIndexMetadata.getIndex().getName());
+            refreshRequest.setParentTask(parentTask);
+            client.admin()
+                .indices()
+                .refresh(
+                    refreshRequest,
+                    new RefreshDownsampleIndexActionListener(listener, parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout)
+                );
+            return true;
+        }
+        return false;
+    }
+
     // 3. downsample index created or already exist (in case of retry). Run downsample indexer persistent task on each shard.
     private void performShardDownsampling(
         DownsampleAction.Request request,