Explorar el Código

Add counters for downsampling (#107389)

This PR adds counters for downsampling: success, failure and failure due
to invalid configuration. They will be used in TSDB dashboard to assess
health of downsampling functionality.
Oleksandr Kolomiiets hace 1 año
padre
commit
635824e186

+ 8 - 2
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java

@@ -31,6 +31,8 @@ public class DownsampleMetrics extends AbstractLifecycleComponent {
 
     public static final String LATENCY_SHARD = "es.tsdb.downsample.latency.shard.histogram";
     public static final String LATENCY_TOTAL = "es.tsdb.downsample.latency.total.histogram";
+    public static final String ACTIONS_SHARD = "es.tsdb.downsample.actions.shard.total";
+    public static final String ACTIONS = "es.tsdb.downsample.actions.total";
 
     private final MeterRegistry meterRegistry;
 
@@ -43,6 +45,8 @@ public class DownsampleMetrics extends AbstractLifecycleComponent {
         // Register all metrics to track.
         meterRegistry.registerLongHistogram(LATENCY_SHARD, "Downsampling action latency per shard", "ms");
         meterRegistry.registerLongHistogram(LATENCY_TOTAL, "Downsampling latency end-to-end", "ms");
+        meterRegistry.registerLongCounter(ACTIONS_SHARD, "Number of shard-level downsampling actions", "count");
+        meterRegistry.registerLongCounter(ACTIONS, "Number of downsampling operations", "count");
     }
 
     @Override
@@ -71,11 +75,13 @@ public class DownsampleMetrics extends AbstractLifecycleComponent {
         }
     }
 
-    void recordLatencyShard(long durationInMilliSeconds, ActionStatus status) {
+    void recordShardOperation(long durationInMilliSeconds, ActionStatus status) {
         meterRegistry.getLongHistogram(LATENCY_SHARD).record(durationInMilliSeconds, Map.of(ActionStatus.NAME, status.getMessage()));
+        meterRegistry.getLongCounter(ACTIONS_SHARD).incrementBy(1L, Map.of(ActionStatus.NAME, status.getMessage()));
     }
 
-    void recordLatencyTotal(long durationInMilliSeconds, ActionStatus status) {
+    void recordOperation(long durationInMilliSeconds, ActionStatus status) {
         meterRegistry.getLongHistogram(LATENCY_TOTAL).record(durationInMilliSeconds, Map.of(ActionStatus.NAME, status.getMessage()));
+        meterRegistry.getLongCounter(ACTIONS).incrementBy(1L, Map.of(ActionStatus.NAME, status.getMessage()));
     }
 }

+ 3 - 3
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java

@@ -191,7 +191,7 @@ class DownsampleShardIndexer {
                 + task.getNumSent()
                 + "]";
             logger.info(error);
-            downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.MISSING_DOCS);
+            downsampleMetrics.recordShardOperation(duration.millis(), DownsampleMetrics.ActionStatus.MISSING_DOCS);
             throw new DownsampleShardIndexerException(error, false);
         }
 
@@ -204,7 +204,7 @@ class DownsampleShardIndexer {
                 + task.getNumFailed()
                 + "]";
             logger.info(error);
-            downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.FAILED);
+            downsampleMetrics.recordShardOperation(duration.millis(), DownsampleMetrics.ActionStatus.FAILED);
             throw new DownsampleShardIndexerException(error, false);
         }
 
@@ -214,7 +214,7 @@ class DownsampleShardIndexer {
             ActionListener.noop()
         );
         logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + indexShard.shardId() + " completed");
-        downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.SUCCESS);
+        downsampleMetrics.recordShardOperation(duration.millis(), DownsampleMetrics.ActionStatus.SUCCESS);
         return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed());
     }
 

+ 22 - 22
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

@@ -179,20 +179,20 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
         this.downsampleMetrics = downsampleMetrics;
     }
 
-    private void recordLatencyOnSuccess(long startTime) {
-        recordLatency(startTime, DownsampleMetrics.ActionStatus.SUCCESS);
+    private void recordSuccessMetrics(long startTime) {
+        recordOperation(startTime, DownsampleMetrics.ActionStatus.SUCCESS);
     }
 
-    private void recordLatencyOnFailure(long startTime) {
-        recordLatency(startTime, DownsampleMetrics.ActionStatus.FAILED);
+    private void recordFailureMetrics(long startTime) {
+        recordOperation(startTime, DownsampleMetrics.ActionStatus.FAILED);
     }
 
-    private void recordLatencyOnInvalidConfiguration(long startTime) {
-        recordLatency(startTime, DownsampleMetrics.ActionStatus.INVALID_CONFIGURATION);
+    private void recordInvalidConfigurationMetrics(long startTime) {
+        recordOperation(startTime, DownsampleMetrics.ActionStatus.INVALID_CONFIGURATION);
     }
 
-    private void recordLatency(long startTime, DownsampleMetrics.ActionStatus status) {
-        downsampleMetrics.recordLatencyTotal(
+    private void recordOperation(long startTime, DownsampleMetrics.ActionStatus status) {
+        downsampleMetrics.recordOperation(
             TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime).getMillis(),
             status
         );
@@ -215,7 +215,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
                 boolean hasDocumentLevelPermissions = indexPermissions.getDocumentPermissions().hasDocumentLevelPermissions();
                 boolean hasFieldLevelSecurity = indexPermissions.getFieldPermissions().hasFieldLevelSecurity();
                 if (hasDocumentLevelPermissions || hasFieldLevelSecurity) {
-                    recordLatencyOnInvalidConfiguration(startTime);
+                    recordInvalidConfigurationMetrics(startTime);
                     listener.onFailure(
                         new ElasticsearchException(
                             "Rollup forbidden for index [" + sourceIndexName + "] with document level or field level security settings."
@@ -228,14 +228,14 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
         // Assert source index exists
         IndexMetadata sourceIndexMetadata = state.getMetadata().index(sourceIndexName);
         if (sourceIndexMetadata == null) {
-            recordLatencyOnInvalidConfiguration(startTime);
+            recordInvalidConfigurationMetrics(startTime);
             listener.onFailure(new IndexNotFoundException(sourceIndexName));
             return;
         }
 
         // Assert source index is a time_series index
         if (IndexSettings.MODE.get(sourceIndexMetadata.getSettings()) != IndexMode.TIME_SERIES) {
-            recordLatencyOnInvalidConfiguration(startTime);
+            recordInvalidConfigurationMetrics(startTime);
             listener.onFailure(
                 new ElasticsearchException(
                     "Rollup requires setting ["
@@ -252,7 +252,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 
         // Assert source index is read-only
         if (state.blocks().indexBlocked(ClusterBlockLevel.WRITE, sourceIndexName) == false) {
-            recordLatencyOnInvalidConfiguration(startTime);
+            recordInvalidConfigurationMetrics(startTime);
             listener.onFailure(
                 new ElasticsearchException(
                     "Downsample requires setting [" + IndexMetadata.SETTING_BLOCKS_WRITE + " = true] for index [" + sourceIndexName + "]"
@@ -327,7 +327,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
             }
 
             if (validationException.validationErrors().isEmpty() == false) {
-                recordLatencyOnInvalidConfiguration(startTime);
+                recordInvalidConfigurationMetrics(startTime);
                 delegate.onFailure(validationException);
                 return;
             }
@@ -336,7 +336,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
             try {
                 mapping = createDownsampleIndexMapping(helper, request.getDownsampleConfig(), mapperService, sourceIndexMappings);
             } catch (IOException e) {
-                recordLatencyOnFailure(startTime);
+                recordFailureMetrics(startTime);
                 delegate.onFailure(e);
                 return;
             }
@@ -361,7 +361,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
                             dimensionFields
                         );
                     } else {
-                        recordLatencyOnFailure(startTime);
+                        recordFailureMetrics(startTime);
                         delegate.onFailure(new ElasticsearchException("Failed to create downsample index [" + downsampleIndexName + "]"));
                     }
                 }, e -> {
@@ -390,7 +390,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
                             dimensionFields
                         );
                     } else {
-                        recordLatencyOnFailure(startTime);
+                        recordFailureMetrics(startTime);
                         delegate.onFailure(e);
                     }
                 })
@@ -516,7 +516,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
                 public void onFailure(Exception e) {
                     logger.error("error while waiting for downsampling persistent task", e);
                     if (errorReported.getAndSet(true) == false) {
-                        recordLatencyOnFailure(startTime);
+                        recordFailureMetrics(startTime);
                     }
                     listener.onFailure(e);
                 }
@@ -958,7 +958,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 
         @Override
         public void onFailure(Exception e) {
-            recordLatencyOnSuccess(startTime);  // Downsampling has already completed in all shards.
+            recordSuccessMetrics(startTime);  // Downsampling has already completed in all shards.
             listener.onFailure(e);
         }
 
@@ -1026,7 +1026,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 
         @Override
         public void onFailure(Exception e) {
-            recordLatencyOnSuccess(startTime);  // Downsampling has already completed in all shards.
+            recordSuccessMetrics(startTime);  // Downsampling has already completed in all shards.
             actionListener.onFailure(e);
         }
 
@@ -1061,7 +1061,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
             request.setParentTask(parentTask);
             client.admin().indices().forceMerge(request, ActionListener.wrap(mergeIndexResp -> {
                 actionListener.onResponse(AcknowledgedResponse.TRUE);
-                recordLatencyOnSuccess(startTime);
+                recordSuccessMetrics(startTime);
             }, t -> {
                 /*
                  * At this point downsample index has been created
@@ -1070,13 +1070,13 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
                  */
                 logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t);
                 actionListener.onResponse(AcknowledgedResponse.TRUE);
-                recordLatencyOnSuccess(startTime);
+                recordSuccessMetrics(startTime);
             }));
         }
 
         @Override
         public void onFailure(Exception e) {
-            recordLatencyOnSuccess(startTime);
+            recordSuccessMetrics(startTime);
             this.actionListener.onFailure(e);
         }
 

+ 14 - 1
x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java

@@ -125,6 +125,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.mockito.Mockito.mock;
 
 public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {
@@ -1178,8 +1179,12 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {
             assertEquals(1, measurement.attributes().size());
             assertThat(measurement.attributes().get("status"), Matchers.in(List.of("success", "failed", "missing_docs")));
         }
+        List<Measurement> shardActionMeasurements = plugin.getLongCounterMeasurement(DownsampleMetrics.ACTIONS_SHARD);
+        assertThat(shardActionMeasurements.size(), greaterThanOrEqualTo(1));
+        assertThat(shardActionMeasurements.get(0).getLong(), equalTo(1L));
+        assertThat(shardActionMeasurements.get(0).attributes().get("status"), Matchers.in(List.of("success", "failed", "missing_docs")));
 
-        // Total latency gets recorded after reindex and force-merge complete.
+        // Total latency and counters are recorded after reindex and force-merge complete.
         assertBusy(() -> {
             final List<Measurement> latencyTotalMetrics = plugin.getLongHistogramMeasurement(DownsampleMetrics.LATENCY_TOTAL);
             assertFalse(latencyTotalMetrics.isEmpty());
@@ -1191,6 +1196,14 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {
                 assertEquals(1, measurement.attributes().size());
                 assertThat(measurement.attributes().get("status"), Matchers.in(List.of("success", "invalid_configuration", "failed")));
             }
+
+            List<Measurement> actionMeasurements = plugin.getLongCounterMeasurement(DownsampleMetrics.ACTIONS);
+            assertThat(actionMeasurements.size(), greaterThanOrEqualTo(1));
+            assertThat(actionMeasurements.get(0).getLong(), equalTo(1L));
+            assertThat(
+                actionMeasurements.get(0).attributes().get("status"),
+                Matchers.in(List.of("success", "invalid_configuration", "failed"))
+            );
         }, 10, TimeUnit.SECONDS);
     }