Browse Source

[Transform] Allow transforms to use PIT with remote clusters again (#105192)

Przemysław Witek 1 year ago
parent
commit
9b584aa1f2

+ 6 - 0
docs/changelog/105192.yaml

@@ -0,0 +1,6 @@
+pr: 105192
+summary: Allow transforms to use PIT with remote clusters again
+area: Transform
+type: enhancement
+issues:
+ - 104518

+ 1 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

@@ -449,8 +449,7 @@ class ClientTransformIndexer extends TransformIndexer {
         ActionListener<Tuple<String, SearchRequest>> listener
     ) {
         SearchRequest searchRequest = namedSearchRequest.v2();
-        // We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems.
-        if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) {
+        if (disablePit || searchRequest.indices().length == 0) {
             listener.onResponse(namedSearchRequest);
             return;
         }

+ 8 - 62
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

@@ -293,8 +293,14 @@ public class ClientTransformIndexerTests extends ESTestCase {
     }
 
     public void testDisablePit() throws InterruptedException {
-        // TransformConfigTests.randomTransformConfig never produces remote indices in the source, hence we are safe here. */
-        TransformConfig config = TransformConfigTests.randomTransformConfig();
+        TransformConfig.Builder configBuilder = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig());
+        if (randomBoolean()) {
+            // TransformConfigTests.randomTransformConfig never produces remote indices in the source.
+            // We need to explicitly set the remote index here for coverage.
+            configBuilder.setSource(new SourceConfig("remote-cluster:remote-index"));
+        }
+        TransformConfig config = configBuilder.build();
+
         boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();
 
         try (var threadPool = createThreadPool()) {
@@ -354,66 +360,6 @@ public class ClientTransformIndexerTests extends ESTestCase {
         }
     }
 
-    public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedException {
-        TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig())
-            // Remote index is configured within source
-            .setSource(new SourceConfig("remote-cluster:remote-index"))
-            .build();
-        boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();
-
-        try (var threadPool = createThreadPool()) {
-            final var client = new PitMockClient(threadPool, true);
-            MockClientTransformIndexer indexer = new MockClientTransformIndexer(
-                mock(ThreadPool.class),
-                new TransformServices(
-                    mock(IndexBasedTransformConfigManager.class),
-                    mock(TransformCheckpointService.class),
-                    mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
-                ),
-                mock(CheckpointProvider.class),
-                new AtomicReference<>(IndexerState.STOPPED),
-                null,
-                new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
-                mock(TransformIndexerStats.class),
-                config,
-                null,
-                new TransformCheckpoint(
-                    "transform",
-                    Instant.now().toEpochMilli(),
-                    0L,
-                    Collections.emptyMap(),
-                    Instant.now().toEpochMilli()
-                ),
-                new TransformCheckpoint(
-                    "transform",
-                    Instant.now().toEpochMilli(),
-                    2L,
-                    Collections.emptyMap(),
-                    Instant.now().toEpochMilli()
-                ),
-                new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
-                mock(TransformContext.class),
-                false
-            );
-
-            // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
-            this.<SearchResponse>assertAsync(
-                listener -> indexer.doNextSearch(0, listener),
-                response -> assertNull(response.pointInTimeId())
-            );
-
-            // reverse the setting
-            indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());
-
-            // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
-            this.<SearchResponse>assertAsync(
-                listener -> indexer.doNextSearch(0, listener),
-                response -> assertNull(response.pointInTimeId())
-            );
-        }
-    }
-
     public void testHandlePitIndexNotFound() throws InterruptedException {
         // simulate a deleted index due to ILM
         try (var threadPool = createThreadPool()) {