Browse Source

Fix Watcher stats class cast exception (#39821)

The watcher stats implementation tries to look at all queued watches
before preparing the result. We want to cast these to a
WatchExecutionTask to extract the context to prepare the stats for
queued watches. The problem is that not all tasks on the watcher queue
were WatchExecutionTask. This is because a manually executed watch was
not even at all wrapped in a WatchExecutionTask. Moreover, we were using
ExecutorService#submit(Runnable) which would wrap the Runnable in a
FutureTask<?>. This commit addresses this by using a WatchExecutionTask,
and also using ExecutorService#execute(Runnable) so that no wrapping
occurs. This will let us continue with the assumption that all queued
tasks are WatchExecutionTasks.
Jason Tedor 6 years ago
parent
commit
1095e10b95

+ 2 - 2
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

@@ -534,12 +534,12 @@ public class ExecutionService {
     // the watch execution task takes another runnable as parameter
     // the best solution would be to move the whole execute() method, which is handed over as ctor parameter
     // over into this class, this is the quicker way though
-    static final class WatchExecutionTask implements Runnable {
+    public static final class WatchExecutionTask implements Runnable {
 
         private final WatchExecutionContext ctx;
         private final Runnable runnable;
 
-        WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) {
+        public WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) {
             this.ctx = ctx;
             this.runnable = runnable;
         }

+ 51 - 35
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java

@@ -25,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode;
+import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
 import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
 import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
 import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchAction;
@@ -110,48 +111,63 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
         }
     }
 
-    private void executeWatch(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener,
-                              Watch watch, boolean knownWatch) {
-
-        threadPool.executor(XPackField.WATCHER).submit(new AbstractRunnable() {
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
+    private void executeWatch(
+            final ExecuteWatchRequest request,
+            final ActionListener<ExecuteWatchResponse> listener,
+            final Watch watch,
+            final boolean knownWatch) {
+        try {
+            /*
+             * Ensure that the headers from the incoming request are used instead those of the stored watch otherwise the watch would run
+             * as the user who stored the watch, but it needs to run as the user who executes this request.
+             */
+            final Map<String, String> headers = new HashMap<>(threadPool.getThreadContext().getHeaders());
+            watch.status().setHeaders(headers);
+
+            final String triggerType = watch.trigger().type();
+            final TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
+
+            final ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(
+                    watch,
+                    knownWatch,
+                    new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
+
+            final ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC);
+            ctxBuilder.executionTime(executionTime);
+            for (final Map.Entry<String, ActionExecutionMode> entry : request.getActionModes().entrySet()) {
+                ctxBuilder.actionMode(entry.getKey(), entry.getValue());
+            }
+            if (request.getAlternativeInput() != null) {
+                ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));
             }
+            if (request.isIgnoreCondition()) {
+                ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE);
+            }
+            ctxBuilder.recordExecution(request.isRecordExecution());
+            final WatchExecutionContext ctx = ctxBuilder.build();
 
-            @Override
-            protected void doRun() throws Exception {
-                // ensure that the headers from the incoming request are used instead those of the stored watch
-                // otherwise the watch would run as the user who stored the watch, but it needs to be run as the user who
-                // executes this request
-                Map<String, String> headers = new HashMap<>(threadPool.getThreadContext().getHeaders());
-                watch.status().setHeaders(headers);
+            // use execute so that the runnable is not wrapped in a RunnableFuture<?>
+            threadPool.executor(XPackField.WATCHER).execute(new ExecutionService.WatchExecutionTask(ctx, new AbstractRunnable() {
 
-                String triggerType = watch.trigger().type();
-                TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
+                @Override
+                public void onFailure(final Exception e) {
+                    listener.onFailure(e);
+                }
 
-                ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, knownWatch,
-                        new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
+                @Override
+                protected void doRun() throws Exception {
+                    final WatchRecord record = executionService.execute(ctx);
+                    final XContentBuilder builder = XContentFactory.jsonBuilder();
 
-                ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC);
-                ctxBuilder.executionTime(executionTime);
-                for (Map.Entry<String, ActionExecutionMode> entry : request.getActionModes().entrySet()) {
-                    ctxBuilder.actionMode(entry.getKey(), entry.getValue());
+                    record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build());
+                    listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON));
                 }
-                if (request.getAlternativeInput() != null) {
-                    ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));
-                }
-                if (request.isIgnoreCondition()) {
-                    ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE);
-                }
-                ctxBuilder.recordExecution(request.isRecordExecution());
 
-                WatchRecord record = executionService.execute(ctxBuilder.build());
-                XContentBuilder builder = XContentFactory.jsonBuilder();
+            }));
+        } catch (final Exception e) {
+            listener.onFailure(e);
+        }
+
 
-                record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build());
-                listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON));
-            }
-        });
     }
 }

+ 124 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecuteWatchQueuedStatsTests.java

@@ -0,0 +1,124 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.watcher.execution;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder;
+import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
+import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode;
+import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchRequest;
+import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchResponse;
+import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
+import org.elasticsearch.xpack.watcher.actions.index.IndexAction;
+import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
+import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent;
+import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
+
+import java.io.IOException;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+
+import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
+import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
+import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
+import static org.hamcrest.Matchers.empty;
+
+public class ExecuteWatchQueuedStatsTests extends AbstractWatcherIntegrationTestCase {
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        // we use a small thread pool to force executions to be queued
+        return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("xpack.watcher.thread_pool.size", 1).build();
+    }
+
+    @Override
+    protected boolean timeWarped() {
+        return false;
+    }
+
+    /*
+     * This test is effectively forcing a manually executed watch to end up queued while we simultaneously try to get stats, including
+     * queued watches. The reason that we do this is because previously a manually executed watch would be queued as a FutureTask<?> while
+     * we try to cast queued watches to WatchExecutionTask. This would previously result in a ClassCastException. This test fails when that
+     * happens yet succeeds with the production code change that accompanies this test.
+     */
+    public void testQueuedStats() throws ExecutionException, InterruptedException {
+        final WatcherClient client = new WatcherClient(client());
+        client.preparePutWatch("id")
+                .setActive(true)
+                .setSource(
+                        new WatchSourceBuilder()
+                                .input(simpleInput("payload", "yes"))
+                                .trigger(schedule(interval("1s")))
+                                .addAction(
+                                        "action",
+                                        TimeValue.timeValueSeconds(1),
+                                        IndexAction.builder("test_index", "acknowledgement").setDocId("id")))
+                .get();
+
+        final int numberOfIterations = 128 - scaledRandomIntBetween(0, 128);
+
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        final List<ActionFuture<ExecuteWatchResponse>> futures = new ArrayList<>(numberOfIterations);
+        final Thread executeWatchThread = new Thread(() -> {
+            try {
+                barrier.await();
+            } catch (final BrokenBarrierException | InterruptedException e) {
+                fail(e.toString());
+            }
+            for (int i = 0; i < numberOfIterations; i++) {
+                final ExecuteWatchRequest request = new ExecuteWatchRequest("id");
+                try {
+                    request.setTriggerEvent(new ManualTriggerEvent(
+                            "id-" + i,
+                            new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC))));
+                } catch (final IOException e) {
+                    fail(e.toString());
+                }
+                request.setActionMode("_all", ActionExecutionMode.EXECUTE);
+                request.setRecordExecution(true);
+                futures.add(client.executeWatch(request));
+            }
+        });
+        executeWatchThread.start();
+
+        final List<FailedNodeException> failures = new ArrayList<>();
+        final Thread watcherStatsThread = new Thread(() -> {
+            try {
+                barrier.await();
+            } catch (final BrokenBarrierException | InterruptedException e) {
+                fail(e.toString());
+            }
+            for (int i = 0; i < numberOfIterations; i++) {
+                final WatcherStatsResponse response = client.prepareWatcherStats().setIncludeQueuedWatches(true).get();
+                failures.addAll(response.failures());
+            }
+        });
+        watcherStatsThread.start();
+
+        executeWatchThread.join();
+        watcherStatsThread.join();
+
+        for (final ActionFuture<ExecuteWatchResponse> future : futures) {
+            future.get();
+        }
+
+        assertThat(failures, empty());
+
+        client.prepareDeleteWatch("id").get();
+    }
+
+}