Przeglądaj źródła

[Transform] Redirect VersionConflict to reset code (#108070)

When the system is under load, it's possible for the
ClientTransformIndexer's reference of the Transform Config's sequence
number to fall out of sync with the system.  When this happens, the
ClientTransformIndexer is never able to save the state of the Transform,
and it floods the logs with warnings.

There is already code in ClientTransformIndexer to reset the sequence
number when it detects a VersionConflictEngineException.  The detection
code only works if it receives a VersionConflictEngineException
directly or if the exception is wrapped in an
ElasticsearchWrapperException.

This change creates a new ElasticsearchWrapperException to denote
TransformConfigException, which will fit in with the existing reset
code.
Pat Whelan 1 rok temu
rodzic
commit
35fcfc8670

+ 5 - 0
docs/changelog/108070.yaml

@@ -0,0 +1,5 @@
+pr: 108070
+summary: Redirect `VersionConflict` to reset code
+area: Transform
+type: bug
+issues: []

+ 13 - 4
x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.transform.persistence;
 
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -28,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.xcontent.ToXContent;
@@ -495,10 +497,17 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
             listener -> transformConfigManager.putOrUpdateTransformStoredDoc(updated, firstIndex, listener),
             (SeqNoPrimaryTermAndIndex) null,
             r -> fail("did not fail with version conflict."),
-            e -> assertThat(
-                e.getMessage(),
-                equalTo("Failed to persist transform statistics for transform [transform_test_stored_doc_create_read_update]")
-            )
+            e -> {
+                assertThat(
+                    e.getMessage(),
+                    equalTo("Failed to persist transform statistics for transform [transform_test_stored_doc_create_read_update]")
+                );
+                assertThat(
+                    "Consumers utilize ExceptionsHelper to check if there was a Version Conflict",
+                    ExceptionsHelper.unwrapCause(e),
+                    instanceOf(VersionConflictEngineException.class)
+                );
+            }
         );
     }
 

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

@@ -671,7 +671,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 ActionListener.wrap(
                     r -> listener.onResponse(SeqNoPrimaryTermAndIndex.fromIndexResponse(r)),
                     e -> listener.onFailure(
-                        new RuntimeException(
+                        new TransformStatePersistenceException(
                             TransformMessages.getMessage(TransformMessages.TRANSFORM_FAILED_TO_PERSIST_STATS, storedDoc.getId()),
                             e
                         )

+ 16 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformStatePersistenceException.java

@@ -0,0 +1,16 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.persistence;
+
+import org.elasticsearch.ElasticsearchWrapperException;
+
+public class TransformStatePersistenceException extends RuntimeException implements ElasticsearchWrapperException {
+    public TransformStatePersistenceException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

+ 20 - 10
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.transform.transforms;
 
 import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.client.internal.ParentTaskAssigningClient;
@@ -43,6 +44,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager;
 import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
+import org.elasticsearch.xpack.transform.persistence.TransformStatePersistenceException;
 import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
 
 import java.time.Clock;
@@ -131,7 +133,12 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
             ActionListener<SeqNoPrimaryTermAndIndex> listener
         ) {
             if (failAt.contains(persistenceCallCount++)) {
-                listener.onFailure(exception);
+                listener.onFailure(
+                    new TransformStatePersistenceException(
+                        "FailingToPutStoredDocTransformConfigManager.putOrUpdateTransformStoredDoc is intentionally throwing an exception",
+                        exception
+                    )
+                );
             } else {
                 super.putOrUpdateTransformStoredDoc(storedDoc, seqNoPrimaryTermAndIndex, listener);
             }
@@ -154,7 +161,10 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
             if (seqNo != -1) {
                 if (seqNoPrimaryTermAndIndex.getSeqNo() != seqNo || seqNoPrimaryTermAndIndex.getPrimaryTerm() != primaryTerm) {
                     listener.onFailure(
-                        new VersionConflictEngineException(new ShardId("index", "indexUUID", 42), "some_id", 45L, 44L, 43L, 42L)
+                        new TransformStatePersistenceException(
+                            "SeqNoCheckingTransformConfigManager.putOrUpdateTransformStoredDoc is intentionally throwing an exception",
+                            new VersionConflictEngineException(new ShardId("index", "indexUUID", 42), "some_id", 45L, 44L, 43L, 42L)
+                        )
                     );
                     return;
                 }
@@ -266,7 +276,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         listener
                     ),
                     e -> {
-                        assertThat(e, isA(exceptionToThrow.getClass()));
+                        assertThat(ExceptionsHelper.unwrapCause(e), isA(exceptionToThrow.getClass()));
                         assertThat(state.get(), equalTo(TransformTaskState.STARTED));
                         assertThat(indexer.getStatePersistenceFailures(), equalTo(1));
                     }
@@ -278,7 +288,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         listener
                     ),
                     e -> {
-                        assertThat(e, isA(exceptionToThrow.getClass()));
+                        assertThat(ExceptionsHelper.unwrapCause(e), isA(exceptionToThrow.getClass()));
                         assertThat(state.get(), equalTo(TransformTaskState.STARTED));
                         assertThat(indexer.getStatePersistenceFailures(), equalTo(2));
                     }
@@ -290,7 +300,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         listener
                     ),
                     e -> {
-                        assertThat(e, isA(exceptionToThrow.getClass()));
+                        assertThat(ExceptionsHelper.unwrapCause(e), isA(exceptionToThrow.getClass()));
                         assertThat(state.get(), equalTo(TransformTaskState.FAILED));
                         assertThat(indexer.getStatePersistenceFailures(), equalTo(3));
                     }
@@ -352,7 +362,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         listener
                     ),
                     e -> {
-                        assertThat(e, isA(exceptionToThrow.getClass()));
+                        assertThat(ExceptionsHelper.unwrapCause(e), isA(exceptionToThrow.getClass()));
                         assertThat(state.get(), equalTo(TransformTaskState.STARTED));
                         assertThat(indexer.getStatePersistenceFailures(), equalTo(1));
                     }
@@ -377,7 +387,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         listener
                     ),
                     e -> {
-                        assertThat(e, isA(exceptionToThrow.getClass()));
+                        assertThat(ExceptionsHelper.unwrapCause(e), isA(exceptionToThrow.getClass()));
                         assertThat(state.get(), equalTo(TransformTaskState.STARTED));
                         assertThat(indexer.getStatePersistenceFailures(), equalTo(1));
                     }
@@ -389,7 +399,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         listener
                     ),
                     e -> {
-                        assertThat(e, isA(exceptionToThrow.getClass()));
+                        assertThat(ExceptionsHelper.unwrapCause(e), isA(exceptionToThrow.getClass()));
                         assertThat(state.get(), equalTo(TransformTaskState.STARTED));
                         assertThat(indexer.getStatePersistenceFailures(), equalTo(2));
                     }
@@ -401,7 +411,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         listener
                     ),
                     e -> {
-                        assertThat(e, isA(exceptionToThrow.getClass()));
+                        assertThat(ExceptionsHelper.unwrapCause(e), isA(exceptionToThrow.getClass()));
                         assertThat(state.get(), equalTo(TransformTaskState.FAILED));
                         assertThat(indexer.getStatePersistenceFailures(), equalTo(3));
                     }
@@ -517,7 +527,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                     listener
                 ),
                 e -> {
-                    assertThat(e, isA(VersionConflictEngineException.class));
+                    assertThat(ExceptionsHelper.unwrapCause(e), isA(VersionConflictEngineException.class));
                     assertThat(state.get(), equalTo(TransformTaskState.STARTED));
                     assertThat(indexer.getStatePersistenceFailures(), equalTo(1));
                 }