Pārlūkot izejas kodu

Expose basic x-pack telemetry for failure store (#108899)

Mary Gouseti 1 gadu atpakaļ
vecāks
revīzija
af45653e00

+ 2 - 0
docs/reference/rest-api/usage.asciidoc

@@ -504,6 +504,7 @@ GET /_xpack/usage
 // TESTRESPONSE[s/"policy_stats" : \[[^\]]*\]/"policy_stats" : $body.$_path/]
 // TESTRESPONSE[s/"slm" : \{[^\}]*\},/"slm" : $body.$_path,/]
 // TESTRESPONSE[s/"health_api" : \{[^\}]*\}\s*\}/"health_api" : $body.$_path/]
+// TESTRESPONSE[s/"data_streams" : \{[^\}]*\},/"data_streams" : $body.$_path,/]
 // TESTRESPONSE[s/ : true/ : $body.$_path/]
 // TESTRESPONSE[s/ : false/ : $body.$_path/]
 // TESTRESPONSE[s/ : (\-)?[0-9]+/ : $body.$_path/]
@@ -519,3 +520,4 @@ GET /_xpack/usage
 // 5. All of the numbers and strings on the right hand side of *every* field in
 //    the response are ignored. So we're really only asserting things about the
 //    the shape of this response, not the values in it.
+// 6. Ignore the contents of data streams until the failure store is tech preview.

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -176,6 +176,7 @@ public class TransportVersions {
     public static final TransportVersion SPARSE_VECTOR_QUERY_ADDED = def(8_667_00_0);
     public static final TransportVersion ESQL_ADD_INDEX_MODE_TO_SOURCE = def(8_668_00_0);
     public static final TransportVersion GET_SHUTDOWN_STATUS_TIMEOUT = def(8_669_00_0);
+    public static final TransportVersion FAILURE_STORE_TELEMETRY = def(8_670_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 2 - 0
x-pack/plugin/core/build.gradle

@@ -1,5 +1,6 @@
 import org.apache.tools.ant.filters.ReplaceTokens
 import org.elasticsearch.gradle.internal.info.BuildParams
+import org.elasticsearch.gradle.Version
 
 import java.nio.file.Paths
 
@@ -172,6 +173,7 @@ testClusters.configureEach {
   setting 'indices.lifecycle.history_index_enabled', 'false'
   keystore 'bootstrap.password', 'x-pack-test-password'
   user username: "x_pack_rest_user", password: "x-pack-test-password"
+  requiresFeature 'es.failure_store_feature_flag_enabled', Version.fromString("8.15.0")
 }
 
 if (BuildParams.inFipsJvm) {

+ 31 - 7
x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java

@@ -20,8 +20,8 @@ import org.elasticsearch.xcontent.json.JsonXContent;
 import java.util.List;
 import java.util.Map;
 
-import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.notNullValue;
 
 public class DataStreamRestIT extends ESRestTestCase {
@@ -42,19 +42,24 @@ public class DataStreamRestIT extends ESRestTestCase {
         assertTrue((boolean) dataStreams.get("enabled"));
     }
 
+    @SuppressWarnings("unchecked")
     public void testDSXpackUsage() throws Exception {
         Map<?, ?> dataStreams = (Map<?, ?>) getLocation("/_xpack/usage").get("data_streams");
         assertNotNull(dataStreams);
         assertTrue((boolean) dataStreams.get("available"));
         assertTrue((boolean) dataStreams.get("enabled"));
-        assertThat(dataStreams.get("data_streams"), anyOf(equalTo(null), equalTo(0)));
-
+        assertThat(dataStreams.get("data_streams"), equalTo(0));
+        assertThat(dataStreams, hasKey("failure_store"));
+        Map<String, Integer> failureStoreStats = (Map<String, Integer>) dataStreams.get("failure_store");
+        assertThat(failureStoreStats.get("enabled_count"), equalTo(0));
+        assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0));
         assertBusy(() -> {
             Map<?, ?> logsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/logs").get("index_templates")).get(0);
             assertThat(logsTemplate, notNullValue());
             assertThat(logsTemplate.get("name"), equalTo("logs"));
             assertThat(((Map<?, ?>) logsTemplate.get("index_template")).get("data_stream"), notNullValue());
         });
+        putFailureStoreTemplate();
 
         // Create a data stream
         Request indexRequest = new Request("POST", "/logs-mysql-default/_doc");
@@ -65,21 +70,29 @@ public class DataStreamRestIT extends ESRestTestCase {
         Request rollover = new Request("POST", "/logs-mysql-default/_rollover");
         client().performRequest(rollover);
 
+        // Create failure store data stream
+        indexRequest = new Request("POST", "/fs/_doc");
+        indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
+        client().performRequest(indexRequest);
+
         dataStreams = (Map<?, ?>) getLocation("/_xpack/usage").get("data_streams");
         assertNotNull(dataStreams);
         assertTrue((boolean) dataStreams.get("available"));
         assertTrue((boolean) dataStreams.get("enabled"));
-        assertThat("got: " + dataStreams, dataStreams.get("data_streams"), equalTo(1));
-        assertThat("got: " + dataStreams, dataStreams.get("indices_count"), equalTo(2));
+        assertThat("got: " + dataStreams, dataStreams.get("data_streams"), equalTo(2));
+        assertThat("got: " + dataStreams, dataStreams.get("indices_count"), equalTo(3));
+        failureStoreStats = (Map<String, Integer>) dataStreams.get("failure_store");
+        assertThat(failureStoreStats.get("enabled_count"), equalTo(1));
+        assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1));
     }
 
     Map<String, Object> getLocation(String path) {
         try {
-            Response executeRepsonse = client().performRequest(new Request("GET", path));
+            Response executeResponse = client().performRequest(new Request("GET", path));
             try (
                 XContentParser parser = JsonXContent.jsonXContent.createParser(
                     XContentParserConfiguration.EMPTY,
-                    EntityUtils.toByteArray(executeRepsonse.getEntity())
+                    EntityUtils.toByteArray(executeResponse.getEntity())
                 )
             ) {
                 return parser.map();
@@ -89,4 +102,15 @@ public class DataStreamRestIT extends ESRestTestCase {
             throw new RuntimeException(e);
         }
     }
+
+    private void putFailureStoreTemplate() {
+        try {
+            Request request = new Request("PUT", "/_index_template/fs-template");
+            request.setJsonEntity("{\"index_patterns\": [\"fs*\"], \"data_stream\": {\"failure_store\": true}}");
+            assertAcknowledged(client().performRequest(request));
+        } catch (Exception e) {
+            fail("failed to insert index template with failure store enabled - got: " + e);
+            throw new RuntimeException(e);
+        }
+    }
 }

+ 17 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java

@@ -50,9 +50,25 @@ public class DataStreamUsageTransportAction extends XPackUsageFeatureTransportAc
         ActionListener<XPackUsageFeatureResponse> listener
     ) {
         final Map<String, DataStream> dataStreams = state.metadata().dataStreams();
+        long backingIndicesCounter = 0;
+        long failureStoreEnabledCounter = 0;
+        long failureIndicesCounter = 0;
+        for (DataStream ds : dataStreams.values()) {
+            backingIndicesCounter += ds.getIndices().size();
+            if (DataStream.isFailureStoreFeatureFlagEnabled()) {
+                if (ds.isFailureStoreEnabled()) {
+                    failureStoreEnabledCounter++;
+                }
+                if (ds.getFailureIndices().getIndices().isEmpty() == false) {
+                    failureIndicesCounter += ds.getFailureIndices().getIndices().size();
+                }
+            }
+        }
         final DataStreamFeatureSetUsage.DataStreamStats stats = new DataStreamFeatureSetUsage.DataStreamStats(
             dataStreams.size(),
-            dataStreams.values().stream().map(ds -> ds.getIndices().size()).reduce(Integer::sum).orElse(0)
+            backingIndicesCounter,
+            failureStoreEnabledCounter,
+            failureIndicesCounter
         );
         final DataStreamFeatureSetUsage usage = new DataStreamFeatureSetUsage(stats);
         listener.onResponse(new XPackUsageFeatureResponse(usage));

+ 22 - 24
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.core.datastreams;
 
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -49,6 +50,12 @@ public class DataStreamFeatureSetUsage extends XPackFeatureSet.Usage {
         super.innerXContent(builder, params);
         builder.field("data_streams", streamStats.totalDataStreamCount);
         builder.field("indices_count", streamStats.indicesBehindDataStream);
+        if (DataStream.isFailureStoreFeatureFlagEnabled()) {
+            builder.startObject("failure_store");
+            builder.field("enabled_count", streamStats.failureStoreEnabledDataStreamCount);
+            builder.field("failure_indices_count", streamStats.failureStoreIndicesCount);
+            builder.endObject();
+        }
     }
 
     @Override
@@ -73,39 +80,30 @@ public class DataStreamFeatureSetUsage extends XPackFeatureSet.Usage {
         return Objects.equals(streamStats, other.streamStats);
     }
 
-    public static class DataStreamStats implements Writeable {
-
-        private final long totalDataStreamCount;
-        private final long indicesBehindDataStream;
-
-        public DataStreamStats(long totalDataStreamCount, long indicesBehindDataStream) {
-            this.totalDataStreamCount = totalDataStreamCount;
-            this.indicesBehindDataStream = indicesBehindDataStream;
-        }
+    public record DataStreamStats(
+        long totalDataStreamCount,
+        long indicesBehindDataStream,
+        long failureStoreEnabledDataStreamCount,
+        long failureStoreIndicesCount
+    ) implements Writeable {
 
         public DataStreamStats(StreamInput in) throws IOException {
-            this.totalDataStreamCount = in.readVLong();
-            this.indicesBehindDataStream = in.readVLong();
+            this(
+                in.readVLong(),
+                in.readVLong(),
+                in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_TELEMETRY) ? in.readVLong() : 0,
+                in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_TELEMETRY) ? in.readVLong() : 0
+            );
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeVLong(this.totalDataStreamCount);
             out.writeVLong(this.indicesBehindDataStream);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(totalDataStreamCount, indicesBehindDataStream);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj.getClass() != getClass()) {
-                return false;
+            if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_TELEMETRY)) {
+                out.writeVLong(this.failureStoreEnabledDataStreamCount);
+                out.writeVLong(this.failureStoreIndicesCount);
             }
-            DataStreamStats other = (DataStreamStats) obj;
-            return totalDataStreamCount == other.totalDataStreamCount && indicesBehindDataStream == other.indicesBehindDataStream;
         }
     }
 }

+ 6 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java

@@ -16,7 +16,12 @@ public class DataStreamFeatureSetUsageTests extends AbstractWireSerializingTestC
     @Override
     protected DataStreamFeatureSetUsage createTestInstance() {
         return new DataStreamFeatureSetUsage(
-            new DataStreamFeatureSetUsage.DataStreamStats(randomNonNegativeLong(), randomNonNegativeLong())
+            new DataStreamFeatureSetUsage.DataStreamStats(
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong()
+            )
         );
     }