Browse Source

[Transform] Report version conflict on concurrent updates (#96293)

Przemysław Witek 2 years ago
parent
commit
7e34514465

+ 6 - 0
docs/changelog/96293.yaml

@@ -0,0 +1,6 @@
+pr: 96293
+summary: Report version conflict on concurrent updates
+area: Transform
+type: bug
+issues:
+ - 96311

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

@@ -16,6 +16,7 @@ public class TransformMessages {
         "Timed out after [{0}] while waiting for transform [{1}] to stop";
     public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT = "Interrupted while waiting for transform [{0}] to stop";
     public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
+    public static final String REST_UPDATE_TRANSFORM_CONFLICT = "Transform with id [{0}] got updated in the meantime. Please try again";
     public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
     public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG =
         "Detected transforms with no config [{0}]. Use force to stop/delete them.";

+ 67 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java

@@ -9,21 +9,33 @@ package org.elasticsearch.xpack.transform.integration;
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.core.Strings;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.junit.After;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 
 public class TransformUpdateIT extends TransformRestTestCase {
 
@@ -47,6 +59,8 @@ public class TransformUpdateIT extends TransformRestTestCase {
     private static final String DATA_ACCESS_ROLE = "test_data_access";
     private static final String DATA_ACCESS_ROLE_2 = "test_data_access_2";
 
+    private TestThreadPool threadPool;
+
     // preserve indices in order to reuse source indices in several test cases
     @Override
     protected boolean preserveIndicesUponCompletion() {
@@ -76,6 +90,15 @@ public class TransformUpdateIT extends TransformRestTestCase {
         setupUser(TEST_ADMIN_USER_NAME_2, List.of("transform_admin", DATA_ACCESS_ROLE_2));
         setupUser(TEST_ADMIN_USER_NAME_NO_DATA, List.of("transform_admin"));
         createReviewsIndex();
+
+        threadPool = new TestThreadPool(getTestName());
+    }
+
+    @After
+    public void shutdownThreadPool() {
+        if (threadPool != null) {
+            threadPool.shutdown();
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -167,6 +190,50 @@ public class TransformUpdateIT extends TransformRestTestCase {
         assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123))));
     }
 
+    public void testConcurrentUpdates() throws Exception {
+        String transformId = "test_concurrent_updates";
+        String destIndex = transformId + "-dest";
+
+        // Create the transform
+        createPivotReviewsTransform(transformId, destIndex, null, null, null);
+
+        // Create a number of concurrent threads competing to update the transform with different settings.
+        int minMaxPageSearchSize = 10;
+        int maxMaxPageSearchSize = 20;
+        List<Callable<Response>> concurrentUpdates = new ArrayList<>(10);
+        for (int maxPageSearchSize = minMaxPageSearchSize; maxPageSearchSize < maxMaxPageSearchSize; ++maxPageSearchSize) {
+            Request updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null);
+            updateTransformRequest.setJsonEntity(Strings.format("""
+                { "settings": { "max_page_search_size": %s } }""", maxPageSearchSize));
+
+            // Schedule a thread to update the transform's settings
+            concurrentUpdates.add(() -> client().performRequest(updateTransformRequest));
+        }
+
+        // Gather the results.
+        List<Future<Response>> futures = threadPool.generic().invokeAll(concurrentUpdates);
+        for (Future<Response> future : futures) {
+            try {  // The update may succeed...
+                future.get();
+            } catch (ExecutionException e) {  // ... but if it fails, it's due to conflict
+                assertThat(e.getCause(), instanceOf(ResponseException.class));
+                ResponseException re = (ResponseException) e.getCause();
+                assertThat(re.getResponse().getStatusLine().getStatusCode(), is(equalTo(409)));
+                assertThat(
+                    re.getMessage(),
+                    containsString("Transform with id [" + transformId + "] got updated in the meantime. Please try again")
+                );
+            }
+        }
+
+        // Verify that the settings got updated. Any of the concurrent threads could have won the competition.
+        Map<String, Object> finalConfig = getTransformConfig(transformId, null);
+        assertThat(
+            (int) XContentMapValues.extractValue(finalConfig, "settings", "max_page_search_size"),
+            is(both(greaterThanOrEqualTo(minMaxPageSearchSize)).and(lessThan(maxMaxPageSearchSize)))
+        );
+    }
+
     private void updateTransferRightsTester(boolean useSecondaryAuthHeaders) throws Exception {
         String transformId = "transform1";
         // Note: Due to a bug the transform does not fail to start after deleting the user and role, therefore invalidating

+ 18 - 8
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

@@ -319,14 +319,16 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
 
     private void putTransformConfiguration(
         TransformConfig transformConfig,
-        DocWriteRequest.OpType optType,
+        DocWriteRequest.OpType opType,
         SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
         ActionListener<Boolean> listener
     ) {
+        assert DocWriteRequest.OpType.CREATE.equals(opType) || DocWriteRequest.OpType.INDEX.equals(opType);
+
         try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
             XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
 
-            IndexRequest indexRequest = new IndexRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).opType(optType)
+            IndexRequest indexRequest = new IndexRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).opType(opType)
                 .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                 .id(TransformConfig.documentId(transformConfig.getId()))
                 .source(source);
@@ -340,12 +342,20 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 indexRequest,
                 ActionListener.wrap(r -> { listener.onResponse(true); }, e -> {
                     if (e instanceof VersionConflictEngineException) {
-                        // the transform already exists
-                        listener.onFailure(
-                            new ResourceAlreadyExistsException(
-                                TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformConfig.getId())
-                            )
-                        );
+                        if (DocWriteRequest.OpType.CREATE.equals(opType)) {  // we want to create the transform but it already exists
+                            listener.onFailure(
+                                new ResourceAlreadyExistsException(
+                                    TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformConfig.getId())
+                                )
+                            );
+                        } else {  // we want to update the transform but it got updated in the meantime, report version conflict
+                            listener.onFailure(
+                                new ElasticsearchStatusException(
+                                    TransformMessages.getMessage(TransformMessages.REST_UPDATE_TRANSFORM_CONFLICT, transformConfig.getId()),
+                                    RestStatus.CONFLICT
+                                )
+                            );
+                        }
                     } else {
                         listener.onFailure(new RuntimeException(TransformMessages.REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION, e));
                     }