Browse Source

Setting to disable x-opaque-id in logs throttling (#78911)

Introduces a setting cluster.deprecation_indexing.x_opaque_id_used.enabled to disable use of
x-opaque-id in RateLimitingFilter. This will be used for deprecation
logs indexing and will not affect logging to files (it uses different
instance of RateLimitingFilter with this flag enabled by default)

Changes the indices backing a deprecation log data stream to be hidden.

Refactors DeprecationHttpIT to be more reliable

relates #76292
closes #77936
Przemyslaw Gomulka 4 years ago
parent
commit
f5e4228bb3

+ 11 - 0
docs/reference/setup/logging-config.asciidoc

@@ -234,6 +234,17 @@ The user ID is included in the `X-Opaque-ID` field in deprecation JSON logs.
 ---------------------------
 // NOTCONSOLE
 
+Deprecation logs can be indexed into `.logs-deprecation.elasticsearch-default` data stream
+`cluster.deprecation_indexing.enabled` setting is set to true.
+
+==== Deprecation logs throttling
+Deprecation logs are deduplicated based on a deprecated feature key
+and x-opaque-id so that if a feature is repeatedly used, it will not overload the deprecation logs.
+This applies to both indexed deprecation logs and logs emitted to log files.
+You can disable the use of `x-opaque-id` in throttling by changing
+`cluster.deprecation_indexing.x_opaque_id_used.enabled` to false
+See link:./server/src/main/java/org/elasticsearch/common/logging/RateLimitingFilter.java[RateLimitingFilter]
+
 [discrete]
 [[json-logging]]
 === JSON log format

+ 27 - 4
server/src/main/java/org/elasticsearch/common/logging/RateLimitingFilter.java

@@ -28,9 +28,21 @@ import java.util.Set;
 import static org.elasticsearch.common.logging.DeprecatedMessage.KEY_FIELD_NAME;
 import static org.elasticsearch.common.logging.DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME;
 
+/**
+ * A filter used for throttling deprecation logs.
+ * A throttling is based on a combined key which consists of `key` from the logged ESMessage and `x-opaque-id`
+ * passed by a user on a HTTP header.
+ * This filter works by using a lruKeyCache - a set of keys which prevents a second message with the same key to be logged.
+ * The lruKeyCache has a size limited to 128, which when breached will remove the oldest entries.
+ *
+ * It is possible to disable use of `x-opaque-id` as a key with {@link RateLimitingFilter#setUseXOpaqueId(boolean) }
+ * @see <a href="https://logging.apache.org/log4j/2.x/manual/filters.htmlf">Log4j2 Filters</a>
+ */
 @Plugin(name = "RateLimitingFilter", category = Node.CATEGORY, elementType = Filter.ELEMENT_TYPE)
 public class RateLimitingFilter extends AbstractFilter {
 
+    private volatile boolean useXOpaqueId = true;
+
     private final Set<String> lruKeyCache = Collections.newSetFromMap(Collections.synchronizedMap(new LinkedHashMap<>() {
         @Override
         protected boolean removeEldestEntry(final Map.Entry<String, Boolean> eldest) {
@@ -57,16 +69,23 @@ public class RateLimitingFilter extends AbstractFilter {
         if (message instanceof ESLogMessage) {
             final ESLogMessage esLogMessage = (ESLogMessage) message;
 
-            String xOpaqueId = esLogMessage.get(X_OPAQUE_ID_FIELD_NAME);
-            final String key = esLogMessage.get(KEY_FIELD_NAME);
-
-            return lruKeyCache.add(xOpaqueId + key) ? Result.ACCEPT : Result.DENY;
+            final String key = getKey(esLogMessage);
+            return lruKeyCache.add(key) ? Result.ACCEPT : Result.DENY;
 
         } else {
             return Result.NEUTRAL;
         }
     }
 
+    private String getKey(ESLogMessage esLogMessage) {
+        final String key = esLogMessage.get(KEY_FIELD_NAME);
+        if (useXOpaqueId) {
+            String xOpaqueId = esLogMessage.get(X_OPAQUE_ID_FIELD_NAME);
+            return xOpaqueId + key;
+        }
+        return key;
+    }
+
     @Override
     public Result filter(LogEvent event) {
         return filter(event.getMessage());
@@ -84,4 +103,8 @@ public class RateLimitingFilter extends AbstractFilter {
     ) {
         return new RateLimitingFilter(match, mismatch);
     }
+
+    public void setUseXOpaqueId(boolean useXOpaqueId) {
+        this.useXOpaqueId = useXOpaqueId;
+    }
 }

+ 22 - 0
server/src/test/java/org/elasticsearch/common/logging/RateLimitingFilterTests.java

@@ -147,4 +147,26 @@ public class RateLimitingFilterTests extends ESTestCase {
         // Third time, it is allowed again
         assertThat(filter.filter(message), equalTo(Result.ACCEPT));
     }
+
+    public void testMessagesXOpaqueIsIgnoredWhenDisabled() {
+        RateLimitingFilter filter = new RateLimitingFilter();
+        filter.setUseXOpaqueId(false);
+        filter.start();
+
+        // Should NOT be rate-limited because it's not in the cache
+        Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "msg 0");
+        assertThat(filter.filter(message), equalTo(Result.ACCEPT));
+
+        // Should  be rate-limited because it was just added to the cache
+        message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "msg 0");
+        assertThat(filter.filter(message), equalTo(Result.DENY));
+
+        // Should be rate-limited because X-Opaque-Id is not used
+        message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", "msg 0");
+        assertThat(filter.filter(message), equalTo(Result.DENY));
+
+        // Should NOT be rate-limited because "key 1" it not in the cache
+        message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 1", "msg 0");
+        assertThat(filter.filter(message), equalTo(Result.ACCEPT));
+    }
 }

+ 1 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/deprecation/deprecation-indexing-settings.json

@@ -2,6 +2,7 @@
   "template": {
     "settings": {
       "index": {
+        "hidden" : true,
         "lifecycle": {
           "name": ".deprecation-indexing-ilm-policy"
         },

+ 425 - 446
x-pack/plugin/deprecation/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/deprecation/DeprecationHttpIT.java

@@ -17,6 +17,7 @@ import org.apache.http.entity.StringEntity;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.common.Strings;
@@ -29,6 +30,8 @@ import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Before;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -62,66 +65,139 @@ public class DeprecationHttpIT extends ESRestTestCase {
      */
     private static final String DATA_STREAM_NAME = ".logs-deprecation.elasticsearch-default";
 
+    @Before
+    public void assertIndexingIsEnabled() throws Exception {
+        configureWriteDeprecationLogsToIndex(true);
+
+        // make sure the deprecation logs indexing is enabled
+        Response response = client().performRequest(new Request("GET", "/_cluster/settings?include_defaults=true&flat_settings=true"));
+        assertOK(response);
+        ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
+
+        final boolean transientValue = jsonNode.at("/transient/cluster.deprecation_indexing.enabled").asBoolean();
+        assertTrue(transientValue);
+
+        // assert index does not exist, which will prevent previous tests to interfere
+        assertBusy(() -> {
+            try {
+                client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
+            } catch (ResponseException e) {
+                if (e.getResponse().getStatusLine().getStatusCode() == 404) {
+                    return;
+                }
+            }
+            List<Map<String, Object>> documents = getIndexedDeprecations();
+            logger.warn(documents);
+            fail("Index should be removed on startup");
+        }, 30, TimeUnit.SECONDS);
+    }
+
+    @After
+    public void cleanUp() throws Exception {
+        // making sure the deprecation indexing cache is reset and index is deleted
+        assertBusy(() -> {
+            try {
+                client().performRequest(new Request("DELETE", "_logging/deprecation_cache"));
+                client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME));
+            } catch (Exception e) {
+                throw new AssertionError(e);
+            }
+
+        }, 30, TimeUnit.SECONDS);
+
+        // switch logging setting to default
+        configureWriteDeprecationLogsToIndex(null);
+    }
+
     /**
      * Check that configuring deprecation settings causes a warning to be added to the
      * response headers.
      */
-    public void testDeprecatedSettingsReturnWarnings() throws IOException {
+    public void testDeprecatedSettingsReturnWarnings() throws Exception {
+        try {
+            XContentBuilder builder = JsonXContent.contentBuilder()
+                .startObject()
+                .startObject("persistent")
+                .field(
+                    TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1.getKey(),
+                    TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1.getDefault(Settings.EMPTY) == false
+                )
+                .field(
+                    TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2.getKey(),
+                    TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2.getDefault(Settings.EMPTY) == false
+                )
+                // There should be no warning for this field
+                .field(
+                    TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING.getKey(),
+                    TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING.getDefault(Settings.EMPTY) == false
+                )
+                .endObject()
+                .endObject();
+
+            final Request request = new Request("PUT", "_cluster/settings");
+            ///
+            request.setJsonEntity(Strings.toString(builder));
+            final Response response = client().performRequest(request);
+
+            final List<String> deprecatedWarnings = getWarningHeaders(response.getHeaders());
+            final List<Matcher<String>> headerMatchers = new ArrayList<>(2);
+
+            for (Setting<Boolean> setting : List.of(
+                TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1,
+                TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2
+            )) {
+                headerMatchers.add(
+                    equalTo(
+                        "["
+                            + setting.getKey()
+                            + "] setting was deprecated in Elasticsearch and will be removed in a future release! "
+                            + "See the breaking changes documentation for the next major version."
+                    )
+                );
+            }
+
+            assertThat(deprecatedWarnings, hasSize(headerMatchers.size()));
+            for (final String deprecatedWarning : deprecatedWarnings) {
+                assertThat(
+                    "Header does not conform to expected pattern",
+                    deprecatedWarning,
+                    matches(HeaderWarning.WARNING_HEADER_PATTERN.pattern())
+                );
+            }
+
+            final List<String> actualWarningValues = deprecatedWarnings.stream()
+                .map(s -> HeaderWarning.extractWarningValueFromWarningHeader(s, true))
+                .collect(Collectors.toList());
+            for (Matcher<String> headerMatcher : headerMatchers) {
+                assertThat(actualWarningValues, hasItem(headerMatcher));
+            }
+
+            assertBusy(() -> {
+                List<Map<String, Object>> documents = getIndexedDeprecations();
+                logger.warn(documents);
+                assertThat(documents, hasSize(2));
+            });
+
+        } finally {
+            cleanupSettings();
+        }
+    }
+
+    private void cleanupSettings() throws IOException {
         XContentBuilder builder = JsonXContent.contentBuilder()
             .startObject()
-            .startObject("transient")
-            .field(
-                TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1.getKey(),
-                TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1.getDefault(Settings.EMPTY) == false
-            )
-            .field(
-                TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2.getKey(),
-                TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2.getDefault(Settings.EMPTY) == false
-            )
+            .startObject("persistent")
+            .field(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1.getKey(), (Boolean) null)
+            .field(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2.getKey(), (Boolean) null)
             // There should be no warning for this field
-            .field(
-                TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING.getKey(),
-                TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING.getDefault(Settings.EMPTY) == false
-            )
+            .field(TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING.getKey(), (Boolean) null)
             .endObject()
             .endObject();
 
         final Request request = new Request("PUT", "_cluster/settings");
         request.setJsonEntity(Strings.toString(builder));
-        final Response response = client().performRequest(request);
-
-        final List<String> deprecatedWarnings = getWarningHeaders(response.getHeaders());
-        final List<Matcher<String>> headerMatchers = new ArrayList<>(2);
-
-        for (Setting<Boolean> setting : List.of(
-            TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1,
-            TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2
-        )) {
-            headerMatchers.add(
-                equalTo(
-                    "["
-                        + setting.getKey()
-                        + "] setting was deprecated in Elasticsearch and will be removed in a future release! "
-                        + "See the breaking changes documentation for the next major version."
-                )
-            );
-        }
-
-        assertThat(deprecatedWarnings, hasSize(headerMatchers.size()));
-        for (final String deprecatedWarning : deprecatedWarnings) {
-            assertThat(
-                "Header does not conform to expected pattern",
-                deprecatedWarning,
-                matches(HeaderWarning.WARNING_HEADER_PATTERN.pattern())
-            );
-        }
-
-        final List<String> actualWarningValues = deprecatedWarnings.stream()
-            .map(s -> HeaderWarning.extractWarningValueFromWarningHeader(s, true))
-            .collect(Collectors.toList());
-        for (Matcher<String> headerMatcher : headerMatchers) {
-            assertThat(actualWarningValues, hasItem(headerMatcher));
-        }
+        client().performRequest(request);
     }
 
     /**
@@ -208,6 +284,11 @@ public class DeprecationHttpIT extends ESRestTestCase {
         // trigger all deprecations
         Request request = new Request("GET", "/_test_cluster/deprecated_settings");
         request.setEntity(buildSettingsRequest(settings, useDeprecatedField ? "deprecated_settings" : "settings"));
+        final RequestOptions options = request.getOptions()
+            .toBuilder()
+            .addHeader("X-Opaque-Id", "XOpaqueId-doTestDeprecationWarningsAppearInHeaders")
+            .build();
+        request.setOptions(options);
         Response response = client().performRequest(request);
         assertOK(response);
 
@@ -232,77 +313,79 @@ public class DeprecationHttpIT extends ESRestTestCase {
     }
 
     public void testDeprecationRouteThrottling() throws Exception {
-        try {
-            configureWriteDeprecationLogsToIndex(true);
 
-            final Request getRequest = createTestRequest("GET");
-            assertOK(client().performRequest(getRequest));
+        final Request deprecatedRequest = deprecatedRequest("GET", "xOpaqueId-testDeprecationRouteThrottling");
+        assertOK(client().performRequest(deprecatedRequest));
 
-            assertOK(client().performRequest(getRequest));
+        assertOK(client().performRequest(deprecatedRequest));
 
-            final Request postRequest = createTestRequest("POST");
-            assertOK(client().performRequest(postRequest));
+        final Request postRequest = deprecatedRequest("POST", "xOpaqueId-testDeprecationRouteThrottling");
+        assertOK(client().performRequest(postRequest));
 
-            assertBusy(() -> {
-                Response response;
-                try {
-                    client().performRequest(new Request("POST", "/" + DATA_STREAM_NAME + "/_refresh?ignore_unavailable=true"));
-                    response = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_search"));
-                } catch (Exception e) {
-                    // It can take a moment for the index to be created. If it doesn't exist then the client
-                    // throws an exception. Translate it into an assertion error so that assertBusy() will
-                    // continue trying.
-                    throw new AssertionError(e);
-                }
-                assertOK(response);
+        assertBusy(() -> {
+            List<Map<String, Object>> documents = getIndexedDeprecations();
 
-                ObjectMapper mapper = new ObjectMapper();
-                final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
+            logger.warn(documents);
+            assertThat(documents, hasSize(3));
 
-                final int hits = jsonNode.at("/hits/total/value").intValue();
-                assertThat(hits, greaterThan(0));
+            assertThat(
+                documents,
+                containsInAnyOrder(
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_POST_/_test_cluster/deprecated_settings"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
+                    ),
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
+                    ),
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
+                        hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
+                    )
+                )
+            );
+        }, 30, TimeUnit.SECONDS);
 
-                List<Map<String, Object>> documents = new ArrayList<>();
+    }
 
-                for (int i = 0; i < hits; i++) {
-                    final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
+    public void testDisableDeprecationLogIndexing() throws Exception {
 
-                    final Map<String, Object> document = new HashMap<>();
-                    hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
+        configureWriteDeprecationLogsToIndex(true);
+        final Request deprecatedRequest = deprecatedRequest("GET", "xOpaqueId-testDisableDeprecationLogIndexing");
+        assertOK(client().performRequest(deprecatedRequest));
+        configureWriteDeprecationLogsToIndex(false);
 
-                    documents.add(document);
-                }
+        final Request postRequest = deprecatedRequest("POST", "xOpaqueId-testDisableDeprecationLogIndexing");
+        assertOK(client().performRequest(postRequest));
 
-                logger.warn(documents);
-                assertThat(documents, hasSize(3));
+        assertBusy(() -> {
+            List<Map<String, Object>> documents = getIndexedDeprecations();
 
-                assertThat(
-                    documents,
-                    containsInAnyOrder(
-                        allOf(
-                            hasEntry(KEY_FIELD_NAME, "deprecated_route_POST_/_test_cluster/deprecated_settings"),
-                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
-                        ),
-                        allOf(
-                            hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
-                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
-                        ),
-                        allOf(
-                            hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
-                            hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
-                        )
+            logger.warn(documents);
+            assertThat(documents, hasSize(2));
+
+            assertThat(
+                documents,
+                containsInAnyOrder(
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
+                    ),
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
+                        hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
                     )
-                );
-            }, 30, TimeUnit.SECONDS);
-        } finally {
-            configureWriteDeprecationLogsToIndex(null);
-            client().performRequest(new Request("DELETE", "_data_stream/" + DATA_STREAM_NAME));
-        }
+                )
+            );
+        }, 30, TimeUnit.SECONDS);
+
     }
 
-    private Request createTestRequest(String method) throws IOException {
+    // triggers two deprecations - endpoint and setting
+    private Request deprecatedRequest(String method, String xOpaqueId) throws IOException {
         final Request getRequest = new Request(method, "/_test_cluster/deprecated_settings");
-        final RequestOptions options = getRequest.getOptions().toBuilder().addHeader("X-Opaque-Id", "some xid").build();
+        final RequestOptions options = getRequest.getOptions().toBuilder().addHeader("X-Opaque-Id", xOpaqueId).build();
         getRequest.setOptions(options);
         getRequest.setEntity(
             buildSettingsRequest(
@@ -317,390 +400,254 @@ public class DeprecationHttpIT extends ESRestTestCase {
      * Check that deprecation messages can be recorded to an index
      */
     public void testDeprecationMessagesCanBeIndexed() throws Exception {
-        try {
-            configureWriteDeprecationLogsToIndex(true);
-
-            final Request request = new Request("GET", "/_test_cluster/deprecated_settings");
-            final RequestOptions options = request.getOptions().toBuilder().addHeader("X-Opaque-Id", "some xid").build();
-            request.setOptions(options);
-            request.setEntity(
-                buildSettingsRequest(
-                    Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1),
-                    "deprecated_settings"
-                )
-            );
-            assertOK(client().performRequest(request));
-
-            assertBusy(() -> {
-                Response response;
-                try {
-                    client().performRequest(new Request("POST", "/" + DATA_STREAM_NAME + "/_refresh?ignore_unavailable=true"));
-                    response = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_search"));
-                } catch (Exception e) {
-                    // It can take a moment for the index to be created. If it doesn't exist then the client
-                    // throws an exception. Translate it into an assertion error so that assertBusy() will
-                    // continue trying.
-                    throw new AssertionError(e);
-                }
-                assertOK(response);
-
-                ObjectMapper mapper = new ObjectMapper();
-                final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
-
-                final int hits = jsonNode.at("/hits/total/value").intValue();
-                assertThat(hits, greaterThan(0));
 
-                List<Map<String, Object>> documents = new ArrayList<>();
+        final Request request = deprecatedRequest("GET", "xOpaqueId-testDeprecationMessagesCanBeIndexed");
+        assertOK(client().performRequest(request));
 
-                for (int i = 0; i < hits; i++) {
-                    final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
+        assertBusy(() -> {
+            List<Map<String, Object>> documents = getIndexedDeprecations();
 
-                    final Map<String, Object> document = new HashMap<>();
-                    hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
+            logger.warn(documents);
+            assertThat(documents, hasSize(2));
 
-                    documents.add(document);
-                }
-
-                logger.warn(documents);
-                assertThat(documents, hasSize(2));
-
-                assertThat(
-                    documents,
-                    containsInAnyOrder(
-                        allOf(
-                            hasKey("@timestamp"),
-                            hasKey("elasticsearch.cluster.name"),
-                            hasKey("elasticsearch.cluster.uuid"),
-                            hasEntry(X_OPAQUE_ID_FIELD_NAME, "some xid"),
-                            hasEntry("elasticsearch.event.category", "settings"),
-                            hasKey("elasticsearch.node.id"),
-                            hasKey("elasticsearch.node.name"),
-                            hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
-                            hasEntry("data_stream.namespace", "default"),
-                            hasEntry("data_stream.type", "logs"),
-                            hasEntry("ecs.version", "1.7"),
-                            hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
-                            hasEntry("event.dataset", "deprecation.elasticsearch"),
-                            hasEntry("log.level", "CRITICAL"),
-                            hasKey("log.logger"),
-                            hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
-                        ),
-                        allOf(
-                            hasKey("@timestamp"),
-                            hasKey("elasticsearch.cluster.name"),
-                            hasKey("elasticsearch.cluster.uuid"),
-                            hasEntry(X_OPAQUE_ID_FIELD_NAME, "some xid"),
-                            hasEntry("elasticsearch.event.category", "api"),
-                            hasKey("elasticsearch.node.id"),
-                            hasKey("elasticsearch.node.name"),
-                            hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
-                            hasEntry("data_stream.namespace", "default"),
-                            hasEntry("data_stream.type", "logs"),
-                            hasEntry("ecs.version", "1.7"),
-                            hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
-                            hasEntry("event.dataset", "deprecation.elasticsearch"),
-                            hasEntry("log.level", "CRITICAL"),
-                            hasKey("log.logger"),
-                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
-                        )
+            assertThat(
+                documents,
+                containsInAnyOrder(
+                    allOf(
+                        hasKey("@timestamp"),
+                        hasKey("elasticsearch.cluster.name"),
+                        hasKey("elasticsearch.cluster.uuid"),
+                        hasEntry(X_OPAQUE_ID_FIELD_NAME, "xOpaqueId-testDeprecationMessagesCanBeIndexed"),
+                        hasEntry("elasticsearch.event.category", "settings"),
+                        hasKey("elasticsearch.node.id"),
+                        hasKey("elasticsearch.node.name"),
+                        hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
+                        hasEntry("data_stream.namespace", "default"),
+                        hasEntry("data_stream.type", "logs"),
+                        hasEntry("ecs.version", "1.7"),
+                        hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
+                        hasEntry("event.dataset", "deprecation.elasticsearch"),
+                        hasEntry("log.level", "CRITICAL"),
+                        hasKey("log.logger"),
+                        hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
+                    ),
+                    allOf(
+                        hasKey("@timestamp"),
+                        hasKey("elasticsearch.cluster.name"),
+                        hasKey("elasticsearch.cluster.uuid"),
+                        hasEntry(X_OPAQUE_ID_FIELD_NAME, "xOpaqueId-testDeprecationMessagesCanBeIndexed"),
+                        hasEntry("elasticsearch.event.category", "api"),
+                        hasKey("elasticsearch.node.id"),
+                        hasKey("elasticsearch.node.name"),
+                        hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
+                        hasEntry("data_stream.namespace", "default"),
+                        hasEntry("data_stream.type", "logs"),
+                        hasEntry("ecs.version", "1.7"),
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
+                        hasEntry("event.dataset", "deprecation.elasticsearch"),
+                        hasEntry("log.level", "CRITICAL"),
+                        hasKey("log.logger"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
                     )
-                );
-            }, 30, TimeUnit.SECONDS);
-        } finally {
-            configureWriteDeprecationLogsToIndex(null);
-            client().performRequest(new Request("DELETE", "_data_stream/" + DATA_STREAM_NAME));
-        }
+                )
+            );
+        }, 30, TimeUnit.SECONDS);
+
     }
 
     /**
      * Check that deprecation messages with WARN level can be recorded to an index
      */
     public void testDeprecationWarnMessagesCanBeIndexed() throws Exception {
-        try {
-            configureWriteDeprecationLogsToIndex(true);
-
-            final Request request = new Request("GET", "/_test_cluster/deprecated_settings");
-            final RequestOptions options = request.getOptions().toBuilder().addHeader("X-Opaque-Id", "some xid").build();
-            request.setOptions(options);
-            request.setEntity(
-                buildSettingsRequest(
-                    Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1),
-                    "deprecation_warning"
-                )
-            );
-            assertOK(client().performRequest(request));
 
-            assertBusy(() -> {
-                Response response;
-                try {
-                    client().performRequest(new Request("POST", "/" + DATA_STREAM_NAME + "/_refresh?ignore_unavailable=true"));
-                    response = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_search"));
-                } catch (Exception e) {
-                    // It can take a moment for the index to be created. If it doesn't exist then the client
-                    // throws an exception. Translate it into an assertion error so that assertBusy() will
-                    // continue trying.
-                    throw new AssertionError(e);
-                }
-                assertOK(response);
-
-                ObjectMapper mapper = new ObjectMapper();
-                final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
-
-                final int hits = jsonNode.at("/hits/total/value").intValue();
-                assertThat(hits, greaterThan(0));
-
-                List<Map<String, Object>> documents = new ArrayList<>();
-
-                for (int i = 0; i < hits; i++) {
-                    final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
-
-                    final Map<String, Object> document = new HashMap<>();
-                    hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
+        final Request request = new Request("GET", "/_test_cluster/deprecated_settings");
+        final RequestOptions options = request.getOptions()
+            .toBuilder()
+            .addHeader("X-Opaque-Id", "xOpaqueId-testDeprecationWarnMessagesCanBeIndexed")
+            .build();
+        request.setOptions(options);
+        request.setEntity(
+            buildSettingsRequest(
+                Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1),
+                "deprecation_warning"
+            )
+        );
+        assertOK(client().performRequest(request));
 
-                    documents.add(document);
-                }
+        assertBusy(() -> {
+            List<Map<String, Object>> documents = getIndexedDeprecations();
 
-                logger.warn(documents);
-                assertThat(documents, hasSize(2));
+            logger.warn(documents);
+            assertThat(documents, hasSize(2));
 
-                assertThat(
-                    documents,
-                    containsInAnyOrder(
-                        allOf(
-                            hasKey("@timestamp"),
-                            hasKey("elasticsearch.cluster.name"),
-                            hasKey("elasticsearch.cluster.uuid"),
-                            hasEntry(X_OPAQUE_ID_FIELD_NAME, "some xid"),
-                            hasEntry("elasticsearch.event.category", "settings"),
-                            hasKey("elasticsearch.node.id"),
-                            hasKey("elasticsearch.node.name"),
-                            hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
-                            hasEntry("data_stream.namespace", "default"),
-                            hasEntry("data_stream.type", "logs"),
-                            hasEntry("ecs.version", "1.7"),
-                            hasEntry(KEY_FIELD_NAME, "deprecated_warn_settings"),
-                            hasEntry("event.dataset", "deprecation.elasticsearch"),
-                            hasEntry("log.level", "WARN"),
-                            hasKey("log.logger"),
-                            hasEntry("message", "[deprecated_warn_settings] usage is deprecated but won't be breaking in next version")
-                        ),
-                        allOf(
-                            hasKey("@timestamp"),
-                            hasKey("elasticsearch.cluster.name"),
-                            hasKey("elasticsearch.cluster.uuid"),
-                            hasEntry(X_OPAQUE_ID_FIELD_NAME, "some xid"),
-                            hasEntry("elasticsearch.event.category", "api"),
-                            hasKey("elasticsearch.node.id"),
-                            hasKey("elasticsearch.node.name"),
-                            hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
-                            hasEntry("data_stream.namespace", "default"),
-                            hasEntry("data_stream.type", "logs"),
-                            hasEntry("ecs.version", "1.7"),
-                            hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
-                            hasEntry("event.dataset", "deprecation.elasticsearch"),
-                            hasEntry("log.level", "CRITICAL"),
-                            hasKey("log.logger"),
-                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
-                        )
+            assertThat(
+                documents,
+                containsInAnyOrder(
+                    allOf(
+                        hasKey("@timestamp"),
+                        hasKey("elasticsearch.cluster.name"),
+                        hasKey("elasticsearch.cluster.uuid"),
+                        hasEntry(X_OPAQUE_ID_FIELD_NAME, "xOpaqueId-testDeprecationWarnMessagesCanBeIndexed"),
+                        hasEntry("elasticsearch.event.category", "settings"),
+                        hasKey("elasticsearch.node.id"),
+                        hasKey("elasticsearch.node.name"),
+                        hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
+                        hasEntry("data_stream.namespace", "default"),
+                        hasEntry("data_stream.type", "logs"),
+                        hasEntry("ecs.version", "1.7"),
+                        hasEntry(KEY_FIELD_NAME, "deprecated_warn_settings"),
+                        hasEntry("event.dataset", "deprecation.elasticsearch"),
+                        hasEntry("log.level", "WARN"),
+                        hasKey("log.logger"),
+                        hasEntry("message", "[deprecated_warn_settings] usage is deprecated but won't be breaking in next version")
+                    ),
+                    allOf(
+                        hasKey("@timestamp"),
+                        hasKey("elasticsearch.cluster.name"),
+                        hasKey("elasticsearch.cluster.uuid"),
+                        hasEntry(X_OPAQUE_ID_FIELD_NAME, "xOpaqueId-testDeprecationWarnMessagesCanBeIndexed"),
+                        hasEntry("elasticsearch.event.category", "api"),
+                        hasKey("elasticsearch.node.id"),
+                        hasKey("elasticsearch.node.name"),
+                        hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
+                        hasEntry("data_stream.namespace", "default"),
+                        hasEntry("data_stream.type", "logs"),
+                        hasEntry("ecs.version", "1.7"),
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
+                        hasEntry("event.dataset", "deprecation.elasticsearch"),
+                        hasEntry("log.level", "CRITICAL"),
+                        hasKey("log.logger"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
                     )
-                );
-            }, 30, TimeUnit.SECONDS);
-        } finally {
-            configureWriteDeprecationLogsToIndex(null);
-            client().performRequest(new Request("DELETE", "_data_stream/" + DATA_STREAM_NAME));
-        }
+                )
+            );
+        }, 30, TimeUnit.SECONDS);
+
     }
 
     /**
      * Check that log messages about REST API compatibility are recorded to an index
      */
     public void testCompatibleMessagesCanBeIndexed() throws Exception {
-        try {
-            configureWriteDeprecationLogsToIndex(true);
-
-            final Request compatibleRequest = new Request("GET", "/_test_cluster/deprecated_settings");
-            final RequestOptions compatibleOptions = compatibleRequest.getOptions()
-                .toBuilder()
-                .addHeader("X-Opaque-Id", "some xid")
-                .addHeader("Accept", "application/vnd.elasticsearch+json;compatible-with=" + RestApiVersion.minimumSupported().major)
-                .addHeader("Content-Type", "application/vnd.elasticsearch+json;compatible-with=" + RestApiVersion.minimumSupported().major)
-                .build();
-            compatibleRequest.setOptions(compatibleOptions);
-            compatibleRequest.setEntity(
-                buildSettingsRequest(
-                    Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1),
-                    "deprecated_settings"
-                )
-            );
-            Response deprecatedApiResponse = client().performRequest(compatibleRequest);
-            assertOK(deprecatedApiResponse);
-
-            final List<String> deprecatedWarnings = getWarningHeaders(deprecatedApiResponse.getHeaders());
-            final List<String> actualWarningValues = deprecatedWarnings.stream()
-                .map(s -> HeaderWarning.extractWarningValueFromWarningHeader(s, true))
-                .collect(Collectors.toList());
-            assertThat(
-                actualWarningValues,
-                containsInAnyOrder(
-                    TestDeprecationHeaderRestAction.DEPRECATED_ENDPOINT,
-                    TestDeprecationHeaderRestAction.COMPATIBLE_API_USAGE
-                )
-            );
-
-            assertBusy(() -> {
-                Response response;
-                try {
-                    client().performRequest(new Request("POST", "/" + DATA_STREAM_NAME + "/_refresh?ignore_unavailable=true"));
-                    response = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_search"));
-                } catch (Exception e) {
-                    // It can take a moment for the index to be created. If it doesn't exist then the client
-                    // throws an exception. Translate it into an assertion error so that assertBusy() will
-                    // continue trying.
-                    throw new AssertionError(e);
-                }
-                assertOK(response);
-
-                ObjectMapper mapper = new ObjectMapper();
-                final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
 
-                final int hits = jsonNode.at("/hits/total/value").intValue();
-                assertThat(hits, greaterThan(0));
-
-                List<Map<String, Object>> documents = new ArrayList<>();
-
-                for (int i = 0; i < hits; i++) {
-                    final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
+        final Request compatibleRequest = new Request("GET", "/_test_cluster/deprecated_settings");
+        final RequestOptions compatibleOptions = compatibleRequest.getOptions()
+            .toBuilder()
+            .addHeader("X-Opaque-Id", "xOpaqueId-testCompatibleMessagesCanBeIndexed")
+            .addHeader("Accept", "application/vnd.elasticsearch+json;compatible-with=" + RestApiVersion.minimumSupported().major)
+            .addHeader("Content-Type", "application/vnd.elasticsearch+json;compatible-with=" + RestApiVersion.minimumSupported().major)
+            .build();
+        compatibleRequest.setOptions(compatibleOptions);
+        compatibleRequest.setEntity(
+            buildSettingsRequest(
+                Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1),
+                "deprecated_settings"
+            )
+        );
+        Response deprecatedApiResponse = client().performRequest(compatibleRequest);
+        assertOK(deprecatedApiResponse);
 
-                    final Map<String, Object> document = new HashMap<>();
-                    hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
+        final List<String> deprecatedWarnings = getWarningHeaders(deprecatedApiResponse.getHeaders());
+        final List<String> actualWarningValues = deprecatedWarnings.stream()
+            .map(s -> HeaderWarning.extractWarningValueFromWarningHeader(s, true))
+            .collect(Collectors.toList());
+        assertThat(
+            actualWarningValues,
+            containsInAnyOrder(TestDeprecationHeaderRestAction.DEPRECATED_ENDPOINT, TestDeprecationHeaderRestAction.COMPATIBLE_API_USAGE)
+        );
 
-                    documents.add(document);
-                }
+        assertBusy(() -> {
+            List<Map<String, Object>> documents = getIndexedDeprecations();
 
-                logger.warn(documents);
-                assertThat(documents, hasSize(2));
+            logger.warn(documents);
+            assertThat(documents, hasSize(2));
 
-                assertThat(
-                    documents,
-                    containsInAnyOrder(
-                        allOf(
-                            hasKey("@timestamp"),
-                            hasKey("elasticsearch.cluster.name"),
-                            hasKey("elasticsearch.cluster.uuid"),
-                            hasEntry(X_OPAQUE_ID_FIELD_NAME, "some xid"),
-                            hasEntry("elasticsearch.event.category", "compatible_api"),
-                            hasKey("elasticsearch.node.id"),
-                            hasKey("elasticsearch.node.name"),
-                            hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
-                            hasEntry("data_stream.namespace", "default"),
-                            hasEntry("data_stream.type", "logs"),
-                            hasEntry("ecs.version", "1.7"),
-                            hasEntry(KEY_FIELD_NAME, "compatible_key"),
-                            hasEntry("event.dataset", "deprecation.elasticsearch"),
-                            hasEntry("log.level", "CRITICAL"),
-                            hasKey("log.logger"),
-                            hasEntry("message", "You are using a compatible API for this request")
-                        ),
-                        allOf(
-                            hasKey("@timestamp"),
-                            hasKey("elasticsearch.cluster.name"),
-                            hasKey("elasticsearch.cluster.uuid"),
-                            hasEntry(X_OPAQUE_ID_FIELD_NAME, "some xid"),
-                            hasEntry("elasticsearch.event.category", "api"),
-                            hasKey("elasticsearch.node.id"),
-                            hasKey("elasticsearch.node.name"),
-                            hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
-                            hasEntry("data_stream.namespace", "default"),
-                            hasEntry("data_stream.type", "logs"),
-                            hasEntry("ecs.version", "1.7"),
-                            hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
-                            hasEntry("event.dataset", "deprecation.elasticsearch"),
-                            hasEntry("log.level", "CRITICAL"),
-                            hasKey("log.logger"),
-                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
-                        )
+            assertThat(
+                documents,
+                containsInAnyOrder(
+                    allOf(
+                        hasKey("@timestamp"),
+                        hasKey("elasticsearch.cluster.name"),
+                        hasKey("elasticsearch.cluster.uuid"),
+                        hasEntry(X_OPAQUE_ID_FIELD_NAME, "xOpaqueId-testCompatibleMessagesCanBeIndexed"),
+                        hasEntry("elasticsearch.event.category", "compatible_api"),
+                        hasKey("elasticsearch.node.id"),
+                        hasKey("elasticsearch.node.name"),
+                        hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
+                        hasEntry("data_stream.namespace", "default"),
+                        hasEntry("data_stream.type", "logs"),
+                        hasEntry("ecs.version", "1.7"),
+                        hasEntry(KEY_FIELD_NAME, "compatible_key"),
+                        hasEntry("event.dataset", "deprecation.elasticsearch"),
+                        hasEntry("log.level", "CRITICAL"),
+                        hasKey("log.logger"),
+                        hasEntry("message", "You are using a compatible API for this request")
+                    ),
+                    allOf(
+                        hasKey("@timestamp"),
+                        hasKey("elasticsearch.cluster.name"),
+                        hasKey("elasticsearch.cluster.uuid"),
+                        hasEntry(X_OPAQUE_ID_FIELD_NAME, "xOpaqueId-testCompatibleMessagesCanBeIndexed"),
+                        hasEntry("elasticsearch.event.category", "api"),
+                        hasKey("elasticsearch.node.id"),
+                        hasKey("elasticsearch.node.name"),
+                        hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
+                        hasEntry("data_stream.namespace", "default"),
+                        hasEntry("data_stream.type", "logs"),
+                        hasEntry("ecs.version", "1.7"),
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
+                        hasEntry("event.dataset", "deprecation.elasticsearch"),
+                        hasEntry("log.level", "CRITICAL"),
+                        hasKey("log.logger"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
                     )
-                );
-            }, 30, TimeUnit.SECONDS);
-        } finally {
-            configureWriteDeprecationLogsToIndex(null);
-            client().performRequest(new Request("DELETE", "_data_stream/" + DATA_STREAM_NAME));
-        }
+                )
+            );
+        }, 30, TimeUnit.SECONDS);
+
     }
 
     /**
      * Check that deprecation messages can be recorded to an index
      */
     public void testDeprecationIndexingCacheReset() throws Exception {
-        try {
-            configureWriteDeprecationLogsToIndex(true);
-
-            final Request getRequest = createTestRequest("GET");
-            assertOK(client().performRequest(getRequest));
 
-            client().performRequest(new Request("DELETE", "/_logging/deprecation_cache"));
+        final Request deprecatedRequest = deprecatedRequest("GET", "xOpaqueId-testDeprecationIndexingCacheReset");
+        assertOK(client().performRequest(deprecatedRequest));
 
-            assertOK(client().performRequest(getRequest));
+        client().performRequest(new Request("DELETE", "/_logging/deprecation_cache"));
 
-            assertBusy(() -> {
-                Response response;
-                try {
-                    client().performRequest(new Request("POST", "/" + DATA_STREAM_NAME + "/_refresh?ignore_unavailable=true"));
-                    response = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_search"));
-                } catch (Exception e) {
-                    // It can take a moment for the index to be created. If it doesn't exist then the client
-                    // throws an exception. Translate it into an assertion error so that assertBusy() will
-                    // continue trying.
-                    throw new AssertionError(e);
-                }
-                assertOK(response);
-
-                ObjectMapper mapper = new ObjectMapper();
-                final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
+        assertOK(client().performRequest(deprecatedRequest));
 
-                final int hits = jsonNode.at("/hits/total/value").intValue();
-                assertThat(hits, greaterThan(0));
+        assertBusy(() -> {
+            List<Map<String, Object>> documents = getIndexedDeprecations();
 
-                List<Map<String, Object>> documents = new ArrayList<>();
+            logger.warn(documents);
+            assertThat(documents, hasSize(4));
 
-                for (int i = 0; i < hits; i++) {
-                    final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
-
-                    final Map<String, Object> document = new HashMap<>();
-                    hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
-
-                    documents.add(document);
-                }
-
-                logger.warn(documents);
-                assertThat(documents, hasSize(4));
-
-                assertThat(
-                    documents,
-                    containsInAnyOrder(
-                        allOf(
-                            hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
-                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
-                        ),
-                        allOf(
-                            hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
-                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
-                        ),
-                        allOf(
-                            hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
-                            hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
-                        ),
-                        allOf(
-                            hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
-                            hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
-                        )
+            assertThat(
+                documents,
+                containsInAnyOrder(
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
+                    ),
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_route_GET_/_test_cluster/deprecated_settings"),
+                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
+                    ),
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
+                        hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
+                    ),
+                    allOf(
+                        hasEntry(KEY_FIELD_NAME, "deprecated_settings"),
+                        hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead")
                     )
-                );
-            }, 30, TimeUnit.SECONDS);
-        } finally {
-            configureWriteDeprecationLogsToIndex(null);
-            client().performRequest(new Request("DELETE", "_data_stream/" + DATA_STREAM_NAME));
-        }
+                )
+            );
+        }, 30, TimeUnit.SECONDS);
+
     }
 
     private void configureWriteDeprecationLogsToIndex(Boolean value) throws IOException {
@@ -710,6 +657,38 @@ public class DeprecationHttpIT extends ESRestTestCase {
         assertOK(response);
     }
 
+    private List<Map<String, Object>> getIndexedDeprecations() throws IOException {
+        Response response;
+        try {
+            client().performRequest(new Request("POST", "/" + DATA_STREAM_NAME + "/_refresh?ignore_unavailable=true"));
+            response = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_search"));
+        } catch (Exception e) {
+            // It can take a moment for the index to be created. If it doesn't exist then the client
+            // throws an exception. Translate it into an assertion error so that assertBusy() will
+            // continue trying.
+            throw new AssertionError(e);
+        }
+        assertOK(response);
+
+        ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
+
+        final int hits = jsonNode.at("/hits/total/value").intValue();
+        assertThat(hits, greaterThan(0));
+
+        List<Map<String, Object>> documents = new ArrayList<>();
+
+        for (int i = 0; i < hits; i++) {
+            final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
+
+            final Map<String, Object> document = new HashMap<>();
+            hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
+
+            documents.add(document);
+        }
+        return documents;
+    }
+
     private List<String> getWarningHeaders(Header[] headers) {
         List<String> warnings = new ArrayList<>();
 

+ 27 - 5
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/Deprecation.java

@@ -40,13 +40,26 @@ import java.util.Collection;
 import java.util.List;
 import java.util.function.Supplier;
 
-import static org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent.WRITE_DEPRECATION_LOGS_TO_INDEX;
 
 /**
  * The plugin class for the Deprecation API
  */
 public class Deprecation extends Plugin implements ActionPlugin {
 
+    public static final Setting<Boolean> WRITE_DEPRECATION_LOGS_TO_INDEX = Setting.boolSetting(
+        "cluster.deprecation_indexing.enabled",
+        false,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
+    public static final Setting<Boolean> USE_X_OPAQUE_ID_IN_FILTERING = Setting.boolSetting(
+        "cluster.deprecation_indexing.x_opaque_id_used.enabled",
+        true,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
     @Override
     public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
         return List.of(
@@ -84,16 +97,25 @@ public class Deprecation extends Plugin implements ActionPlugin {
         templateRegistry.initialize();
 
         final RateLimitingFilter rateLimitingFilterForIndexing = new RateLimitingFilter();
+        // enable on start.
+        rateLimitingFilterForIndexing.setUseXOpaqueId(USE_X_OPAQUE_ID_IN_FILTERING.getDefault(environment.settings()));
+
+        final DeprecationIndexingComponent component = new DeprecationIndexingComponent(client,
+            environment.settings(),
+            rateLimitingFilterForIndexing,
+            WRITE_DEPRECATION_LOGS_TO_INDEX.getDefault(environment.settings()) //pass the default on startup
+        );
 
-        final DeprecationIndexingComponent component = new DeprecationIndexingComponent(client, environment.settings(),
-            rateLimitingFilterForIndexing);
-        clusterService.addListener(component);
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(USE_X_OPAQUE_ID_IN_FILTERING,
+            rateLimitingFilterForIndexing::setUseXOpaqueId);
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(WRITE_DEPRECATION_LOGS_TO_INDEX,
+            component::enableDeprecationLogIndexing);
 
         return List.of(component, rateLimitingFilterForIndexing);
     }
 
     @Override
     public List<Setting<?>> getSettings() {
-        return List.of(WRITE_DEPRECATION_LOGS_TO_INDEX);
+        return List.of(USE_X_OPAQUE_ID_IN_FILTERING, WRITE_DEPRECATION_LOGS_TO_INDEX);
     }
 }

+ 13 - 26
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.deprecation.logging;
 
 import co.elastic.logging.log4j2.EcsLayout;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.LoggerContext;
@@ -20,19 +21,15 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
-import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.logging.ECSJsonLayout;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.logging.RateLimitingFilter;
-import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.xpack.core.ClientHelper;
 
 import java.util.Arrays;
@@ -44,21 +41,15 @@ import java.util.stream.Collectors;
  * This component manages the construction and lifecycle of the {@link DeprecationIndexingAppender}.
  * It also starts and stops the appender
  */
-public class DeprecationIndexingComponent extends AbstractLifecycleComponent implements ClusterStateListener {
+public class DeprecationIndexingComponent extends AbstractLifecycleComponent {
     private static final Logger logger = LogManager.getLogger(DeprecationIndexingComponent.class);
 
-    public static final Setting<Boolean> WRITE_DEPRECATION_LOGS_TO_INDEX = Setting.boolSetting(
-        "cluster.deprecation_indexing.enabled",
-        false,
-        Setting.Property.NodeScope,
-        Setting.Property.Dynamic
-    );
-
     private final DeprecationIndexingAppender appender;
     private final BulkProcessor processor;
     private final RateLimitingFilter rateLimitingFilterForIndexing;
 
-    public DeprecationIndexingComponent(Client client, Settings settings, RateLimitingFilter rateLimitingFilterForIndexing) {
+    public DeprecationIndexingComponent(Client client, Settings settings, RateLimitingFilter rateLimitingFilterForIndexing,
+                                        boolean enableDeprecationLogIndexingDefault) {
         this.rateLimitingFilterForIndexing = rateLimitingFilterForIndexing;
 
         this.processor = getBulkProcessor(new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN), settings);
@@ -74,6 +65,7 @@ public class DeprecationIndexingComponent extends AbstractLifecycleComponent imp
 
         this.appender = new DeprecationIndexingAppender("deprecation_indexing_appender",
             rateLimitingFilterForIndexing, ecsLayout, consumer);
+        enableDeprecationLogIndexing(enableDeprecationLogIndexingDefault);
     }
 
     @Override
@@ -93,24 +85,19 @@ public class DeprecationIndexingComponent extends AbstractLifecycleComponent imp
         this.processor.close();
     }
 
-    /**
-     * Listens for changes to the cluster state, in order to know whether to toggle indexing
-     * and to set the cluster UUID and node ID. These can't be set in the constructor because
-     * the initial cluster state won't be set yet.
-     *
-     * @param event the cluster state event to process
-     */
-    @Override
-    public void clusterChanged(ClusterChangedEvent event) {
-        final ClusterState state = event.state();
-        final boolean newEnabled = WRITE_DEPRECATION_LOGS_TO_INDEX.get(state.getMetadata().settings());
+
+    public void enableDeprecationLogIndexing(boolean newEnabled) {
         if (appender.isEnabled() != newEnabled) {
+            appender.setEnabled(newEnabled);
+
             // We've flipped from disabled to enabled. Make sure we start with a clean cache of
             // previously-seen keys, otherwise we won't index anything.
             if (newEnabled) {
                 this.rateLimitingFilterForIndexing.reset();
+            } else {
+                // we have flipped from enabled to disabled. A processor could have accumulated some requests, so we have to flush it
+                this.processor.flush();
             }
-            appender.setEnabled(newEnabled);
         }
     }