Browse Source

Avoid early release of local forking requests (#77641)

Today we protect against releasing a request from a remote node until
its handler has completed, but we do not have the same protection for
requests from the local node. This commit adds the missing refcounting.

Relates #77407
Closes #77634
David Turner 4 years ago
parent
commit
89dd3c4eef
1 changed files with 35 additions and 21 deletions
  1. 35 21
      server/src/main/java/org/elasticsearch/transport/TransportService.java

+ 35 - 21
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -761,33 +761,47 @@ public class TransportService extends AbstractLifecycleComponent
             if (ThreadPool.Names.SAME.equals(executor)) {
                 reg.processMessageReceived(request, channel);
             } else {
-                threadPool.executor(executor).execute(new AbstractRunnable() {
-                    @Override
-                    protected void doRun() throws Exception {
-                        reg.processMessageReceived(request, channel);
-                    }
+                boolean success = false;
+                request.incRef();
+                try {
+                    threadPool.executor(executor).execute(new AbstractRunnable() {
+                        @Override
+                        protected void doRun() throws Exception {
+                            reg.processMessageReceived(request, channel);
+                        }
 
-                    @Override
-                    public boolean isForceExecution() {
-                        return reg.isForceExecution();
-                    }
+                        @Override
+                        public boolean isForceExecution() {
+                            return reg.isForceExecution();
+                        }
 
-                    @Override
-                    public void onFailure(Exception e) {
-                        try {
-                            channel.sendResponse(e);
-                        } catch (Exception inner) {
-                            inner.addSuppressed(e);
-                            logger.warn(() -> new ParameterizedMessage(
+                        @Override
+                        public void onFailure(Exception e) {
+                            try {
+                                channel.sendResponse(e);
+                            } catch (Exception inner) {
+                                inner.addSuppressed(e);
+                                logger.warn(() -> new ParameterizedMessage(
                                     "failed to notify channel of error message for action [{}]", action), inner);
+                            }
                         }
-                    }
 
-                    @Override
-                    public String toString() {
-                        return "processing of [" + requestId + "][" + action + "]: " + request;
+                        @Override
+                        public String toString() {
+                            return "processing of [" + requestId + "][" + action + "]: " + request;
+                        }
+
+                        @Override
+                        public void onAfter() {
+                            request.decRef();
+                        }
+                    });
+                    success = true;
+                } finally {
+                    if (success == false) {
+                        request.decRef();
                     }
-                });
+                }
             }
 
         } catch (Exception e) {