浏览代码

[Transform] Fix possible audit logging disappearance after rolling upgrade (#49731)

ensure audit index template is available during a rolling upgrade before a
transform task can write to it.

fixes #49730
Hendrik Muhs 5 年之前
父节点
当前提交
ace25a9389

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java

@@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants {
     public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";
 
     // audit index
-    public static final String AUDIT_TEMPLATE_VERSION = "000001";
+    // gh #49730: upped version of audit index to 000002
+    public static final String AUDIT_TEMPLATE_VERSION = "000002";
     public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
     public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
     public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";

+ 67 - 11
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

@@ -315,22 +315,39 @@ public final class TransformInternalIndex {
                 .endObject();
     }
 
-    public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
-        return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
-    }
-
     /**
      * This method should be called before any document is indexed that relies on the
-     * existence of the latest index template to create the internal index.  The
-     * reason is that the standard template upgrader only runs when the master node
+     * existence of the latest index templates to create the internal and audit index.
+     * The reason is that the standard template upgrader only runs when the master node
      * is upgraded to the newer version.  If data nodes are upgraded before master
      * nodes and transforms get assigned to those data nodes then without this check
      * the data nodes will index documents into the internal index before the necessary
      * index template is present and this will result in an index with completely
      * dynamic mappings being created (which is very bad).
      */
-    public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client,
-                                                                     ActionListener<Void> listener) {
+    public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {
+
+        installLatestVersionedIndexTemplateIfRequired(
+            clusterService,
+            client,
+            ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure)
+        );
+
+    }
+
+    protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
+        return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
+    }
+
+    protected static boolean haveLatestAuditIndexTemplate(ClusterState state) {
+        return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX);
+    }
+
+    protected static void installLatestVersionedIndexTemplateIfRequired(
+        ClusterService clusterService,
+        Client client,
+        ActionListener<Void> listener
+    ) {
 
         // The check for existence of the template is against local cluster state, so very cheap
         if (haveLatestVersionedIndexTemplate(clusterService.state())) {
@@ -348,13 +365,52 @@ public final class TransformInternalIndex {
                 .settings(indexTemplateMetaData.settings())
                 .mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
             ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
-            executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
-                innerListener, client.admin().indices()::putTemplate);
+            executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                TRANSFORM_ORIGIN,
+                request,
+                innerListener,
+                client.admin().indices()::putTemplate
+            );
         } catch (IOException e) {
             listener.onFailure(e);
         }
     }
 
-    private TransformInternalIndex() {
+    protected static void installLatestAuditIndexTemplateIfRequired(
+        ClusterService clusterService,
+        Client client,
+        ActionListener<Void> listener
+    ) {
+
+        // The check for existence of the template is against local cluster state, so very cheap
+        if (haveLatestAuditIndexTemplate(clusterService.state())) {
+            listener.onResponse(null);
+            return;
+        }
+
+        // Installing the template involves communication with the master node, so it's more expensive but much rarer
+        try {
+            IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData();
+            BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
+            PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
+                indexTemplateMetaData.patterns()
+            )
+                .version(indexTemplateMetaData.version())
+                .settings(indexTemplateMetaData.settings())
+                .mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
+            ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
+            executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                TRANSFORM_ORIGIN,
+                request,
+                innerListener,
+                client.admin().indices()::putTemplate
+            );
+        } catch (IOException e) {
+            listener.onFailure(e);
+        }
     }
+
+    private TransformInternalIndex() {}
 }

+ 2 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -271,8 +271,8 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
             }
         );
 
-        // <1> Check the internal index template is installed
-        TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener);
+        // <1> Check the index templates are installed
+        TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener);
     }
 
     private static IndexerState currentIndexerState(TransformState previousState) {

+ 110 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java

@@ -38,6 +38,7 @@ import static org.mockito.Mockito.when;
 public class TransformInternalIndexTests extends ESTestCase {
 
     public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;
+    public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE;
 
     static {
         ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
@@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase {
         ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
         csBuilder.metaData(metaBuilder.build());
         STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();
+
+        mapBuilder = ImmutableOpenMap.builder();
+        try {
+            mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        metaBuilder = MetaData.builder();
+        metaBuilder.templates(mapBuilder.build());
+        csBuilder = ClusterState.builder(ClusterName.DEFAULT);
+        csBuilder.metaData(metaBuilder.build());
+        STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build();
     }
 
     public void testHaveLatestVersionedIndexTemplate() {
@@ -81,8 +94,7 @@ public class TransformInternalIndexTests extends ESTestCase {
         when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
 
         IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
-        doAnswer(
-            invocationOnMock -> {
+        doAnswer(invocationOnMock -> {
             @SuppressWarnings("unchecked")
             ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
             listener.onResponse(new AcknowledgedResponse(true));
@@ -112,4 +124,100 @@ public class TransformInternalIndexTests extends ESTestCase {
         verify(indicesClient, times(1)).putTemplate(any(), any());
         verifyNoMoreInteractions(indicesClient);
     }
+
+    public void testHaveLatestAuditIndexTemplate() {
+
+        assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE));
+        assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE));
+    }
+
+    public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() {
+
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE);
+
+        Client client = mock(Client.class);
+
+        AtomicBoolean gotResponse = new AtomicBoolean(false);
+        ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
+
+        TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);
+
+        assertTrue(gotResponse.get());
+        verifyNoMoreInteractions(client);
+    }
+
+    public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() {
+
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+
+        IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
+            listener.onResponse(new AcknowledgedResponse(true));
+            return null;
+        }).when(indicesClient).putTemplate(any(), any());
+
+        AdminClient adminClient = mock(AdminClient.class);
+        when(adminClient.indices()).thenReturn(indicesClient);
+        Client client = mock(Client.class);
+        when(client.admin()).thenReturn(adminClient);
+
+        ThreadPool threadPool = mock(ThreadPool.class);
+        when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
+        when(client.threadPool()).thenReturn(threadPool);
+
+        AtomicBoolean gotResponse = new AtomicBoolean(false);
+        ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
+
+        TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);
+
+        assertTrue(gotResponse.get());
+        verify(client, times(1)).threadPool();
+        verify(client, times(1)).admin();
+        verifyNoMoreInteractions(client);
+        verify(adminClient, times(1)).indices();
+        verifyNoMoreInteractions(adminClient);
+        verify(indicesClient, times(1)).putTemplate(any(), any());
+        verifyNoMoreInteractions(indicesClient);
+    }
+
+    public void testInstallLatestIndexTemplateIfRequired_GivenRequired() {
+
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+
+        IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
+            listener.onResponse(new AcknowledgedResponse(true));
+            return null;
+        }).when(indicesClient).putTemplate(any(), any());
+
+        AdminClient adminClient = mock(AdminClient.class);
+        when(adminClient.indices()).thenReturn(indicesClient);
+        Client client = mock(Client.class);
+        when(client.admin()).thenReturn(adminClient);
+
+        ThreadPool threadPool = mock(ThreadPool.class);
+        when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
+        when(client.threadPool()).thenReturn(threadPool);
+
+        AtomicBoolean gotResponse = new AtomicBoolean(false);
+        ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
+
+        TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener);
+
+        assertTrue(gotResponse.get());
+        verify(client, times(2)).threadPool();
+        verify(client, times(2)).admin();
+        verifyNoMoreInteractions(client);
+        verify(adminClient, times(2)).indices();
+        verifyNoMoreInteractions(adminClient);
+        verify(indicesClient, times(2)).putTemplate(any(), any());
+        verifyNoMoreInteractions(indicesClient);
+    }
 }

+ 7 - 1
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml

@@ -261,7 +261,7 @@ setup:
         transform_id: "mixed-simple-continuous-transform"
 
 ---
-"Test index mappings for latest internal index":
+"Test index mappings for latest internal index and audit index":
   - do:
       transform.put_transform:
         transform_id: "upgraded-simple-transform"
@@ -282,3 +282,9 @@ setup:
         index: .transform-internal-004
   - match: { \.transform-internal-004.mappings.dynamic: "false" }
   - match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }
+  - do:
+      indices.get_mapping:
+        index: .transform-notifications-000002
+  - match: { \.transform-notifications-000002.mappings.dynamic: "false" }
+  - match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" }
+  - match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }