Browse Source

Log bulk errors when indexing deprecation logs (#62159)

The bulk processor used to index deprecation logs was doing nothing to
output any errors encountered when writing documents to ES. These are
now detected.

Also expand the deprecation REST tests to check the shape of the indexed
documents.
Rory Hunter 5 years ago
parent
commit
607b7c3659

+ 41 - 4
x-pack/plugin/deprecation/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/deprecation/DeprecationHttpIT.java

@@ -13,6 +13,7 @@ import org.apache.http.HttpHost;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.entity.StringEntity;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestClientBuilder;
@@ -35,12 +36,14 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
 import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.hasSize;
 
 
 /**
 /**
@@ -224,8 +227,12 @@ public class DeprecationHttpIT extends ESRestTestCase {
         try {
         try {
             configureWriteDeprecationLogsToIndex(true);
             configureWriteDeprecationLogsToIndex(true);
 
 
-            Request request = new Request("GET", "/_test_cluster/deprecated_settings");
-            request.setEntity(buildSettingsRequest(List.of(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1), true));
+            final Request request = new Request("GET", "/_test_cluster/deprecated_settings");
+            final RequestOptions options = request.getOptions().toBuilder().addHeader("X-Opaque-Id", "some xid").build();
+            request.setOptions(options);
+            request.setEntity(
+                buildSettingsRequest(Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1), true)
+            );
             assertOK(client().performRequest(request));
             assertOK(client().performRequest(request));
 
 
             assertBusy(() -> {
             assertBusy(() -> {
@@ -263,8 +270,38 @@ public class DeprecationHttpIT extends ESRestTestCase {
                 assertThat(
                 assertThat(
                     documents,
                     documents,
                     hasItems(
                     hasItems(
-                        hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead"),
-                        hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests")
+                        allOf(
+                            hasKey("@timestamp"),
+                            hasKey("cluster.name"),
+                            hasKey("cluster.uuid"),
+                            hasKey("log.logger"),
+                            hasEntry("data_stream.datatype", "deprecation"),
+                            hasEntry("data_stream.namespace", "elasticsearch"),
+                            hasEntry("data_stream.type", "logs"),
+                            hasEntry("ecs.version", "1.6"),
+                            hasEntry("key", "deprecated_settings"),
+                            hasEntry("log.level", "DEPRECATION"),
+                            hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead"),
+                            hasKey("node.id"),
+                            hasKey("node.name"),
+                            hasEntry("x-opaque-id", "some xid")
+                        ),
+                        allOf(
+                            hasKey("@timestamp"),
+                            hasKey("cluster.name"),
+                            hasKey("cluster.uuid"),
+                            hasKey("log.logger"),
+                            hasEntry("data_stream.datatype", "deprecation"),
+                            hasEntry("data_stream.namespace", "elasticsearch"),
+                            hasEntry("data_stream.type", "logs"),
+                            hasEntry("ecs.version", "1.6"),
+                            hasEntry("key", "deprecated_route"),
+                            hasEntry("log.level", "DEPRECATION"),
+                            hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests"),
+                            hasKey("node.id"),
+                            hasKey("node.name"),
+                            hasEntry("x-opaque-id", "some xid")
+                        )
                     )
                     )
                 );
                 );
             });
             });

+ 24 - 3
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java

@@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -33,7 +34,10 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ClientHelper;
 
 
+import java.util.Arrays;
+import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 
 /**
 /**
  * This component manages the construction and lifecycle of the {@link DeprecationIndexingAppender}.
  * This component manages the construction and lifecycle of the {@link DeprecationIndexingAppender}.
@@ -106,7 +110,8 @@ public class DeprecationIndexingComponent extends AbstractLifecycleComponent imp
 
 
     /**
     /**
      * Constructs a bulk processor for writing documents
      * Constructs a bulk processor for writing documents
-     * @param client the client to use
+     *
+     * @param client   the client to use
      * @param settings the settings to use
      * @param settings the settings to use
      * @return an initialised bulk processor
      * @return an initialised bulk processor
      */
      */
@@ -131,11 +136,27 @@ public class DeprecationIndexingComponent extends AbstractLifecycleComponent imp
         public void beforeBulk(long executionId, BulkRequest request) {}
         public void beforeBulk(long executionId, BulkRequest request) {}
 
 
         @Override
         @Override
-        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
+        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+            long numberOfActions = request.numberOfActions();
+            if (logger.isTraceEnabled()) {
+                logger.trace(
+                    "indexed [{}] deprecation documents into [{}]",
+                    numberOfActions,
+                    Arrays.stream(response.getItems()).map(BulkItemResponse::getIndex).distinct().collect(Collectors.joining(","))
+                );
+            }
+
+            if (response.hasFailures()) {
+                Map<String, String> failures = Arrays.stream(response.getItems())
+                    .filter(BulkItemResponse::isFailed)
+                    .collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
+                logger.error("Bulk write of deprecation logs encountered some failures: [{}]", failures);
+            }
+        }
 
 
         @Override
         @Override
         public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
         public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-            logger.error("Bulk write of deprecation logs failed: " + failure.getMessage(), failure);
+            logger.error("Bulk write of " + request.numberOfActions() + " deprecation logs failed: " + failure.getMessage(), failure);
         }
         }
     }
     }
 }
 }