Browse Source

[Monitoring] LocalExporter should catch synchronous exceptions (#36606)

In the unlikely scenario that `LocalExporter::resolveBulk` throws
an exception, then we should mark the `listener` as having failed.
Chris Earle 6 years ago
parent
commit
42d76a7e86

+ 6 - 3
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java

@@ -71,7 +71,6 @@ import java.util.stream.Collectors;
 import static org.elasticsearch.common.Strings.collectionToCommaDelimitedString;
 import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
-import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
 import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.LAST_UPDATED_VERSION;
 import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.PIPELINE_IDS;
 import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
@@ -145,7 +144,11 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
         if (state.get() != State.RUNNING) {
             listener.onResponse(null);
         } else {
-            listener.onResponse(resolveBulk(clusterService.state(), false));
+            try {
+                listener.onResponse(resolveBulk(clusterService.state(), false));
+            } catch (Exception e) {
+                listener.onFailure(e);
+            }
         }
     }
 
@@ -314,7 +317,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
         if (asyncActions.size() > 0) {
             if (installingSomething.compareAndSet(false, true)) {
                 pendingResponses.set(asyncActions.size());
-                try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), MONITORING_ORIGIN)) {
+                try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(MONITORING_ORIGIN)) {
                     asyncActions.forEach(Runnable::run);
                 }
             } else {