Browse Source

Add additional low-level logging handler ()

* Add additional low-level logging handler

We have the trace handler which is useful for recording sent messages
but there are times where it would be useful to have more low-level
logging about the events occurring on a channel. This commit adds a
logging handler that can be enabled by setting a certain log level
(org.elasticsearch.transport.netty4.ESLoggingHandler) to trace that
provides trace logging on low-level channel events and includes some
information about the request/response read/write events on the channel
as well.

* Remove imports

* License header

* Remove redundant

* Add test

* More assertions
Jason Tedor 8 years ago
parent
commit
470e5e7cfc

+ 1 - 1
core/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -183,7 +183,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
         key -> intSetting(key, -1, -1, Setting.Property.NodeScope));
 
     private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
-    private static final int PING_DATA_SIZE = -1;
+    public static final int PING_DATA_SIZE = -1;
     private final CircuitBreakerService circuitBreakerService;
     // package visibility for tests
     protected final ScheduledPing scheduledPing;

+ 1 - 1
core/src/main/java/org/elasticsearch/transport/TransportStatus.java

@@ -19,7 +19,7 @@
 
 package org.elasticsearch.transport;
 
-final class TransportStatus {
+public final class TransportStatus {
 
     private static final byte STATUS_REQRES = 1 << 0;
     private static final byte STATUS_ERROR = 1 << 1;

+ 131 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java

@@ -0,0 +1,131 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport.netty4;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.internal.StringUtil;
+import org.elasticsearch.Version;
+import org.elasticsearch.common.compress.Compressor;
+import org.elasticsearch.common.compress.CompressorFactory;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.transport.TcpHeader;
+import org.elasticsearch.transport.TcpTransport;
+import org.elasticsearch.transport.TransportStatus;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+final class ESLoggingHandler extends LoggingHandler {
+
+    ESLoggingHandler() {
+        super(LogLevel.TRACE);
+    }
+
+    @Override
+    protected String format(final ChannelHandlerContext ctx, final String eventName, final Object arg) {
+        if (arg instanceof ByteBuf) {
+            try {
+                return format(ctx, eventName, (ByteBuf) arg);
+            } catch (final Exception e) {
+                // we really do not want to allow a bug in the formatting handling to escape
+                logger.trace("an exception occurred formatting a trace message", e);
+                // we are going to let this be formatted via the default formatting
+                return super.format(ctx, eventName, arg);
+            }
+        } else {
+            return super.format(ctx, eventName, arg);
+        }
+    }
+
+    private static final int MESSAGE_LENGTH_OFFSET = TcpHeader.MARKER_BYTES_SIZE;
+    private static final int REQUEST_ID_OFFSET = MESSAGE_LENGTH_OFFSET + TcpHeader.MESSAGE_LENGTH_SIZE;
+    private static final int STATUS_OFFSET = REQUEST_ID_OFFSET + TcpHeader.REQUEST_ID_SIZE;
+    private static final int VERSION_ID_OFFSET = STATUS_OFFSET + TcpHeader.STATUS_SIZE;
+    private static final int ACTION_OFFSET = VERSION_ID_OFFSET + TcpHeader.VERSION_ID_SIZE;
+
+    private String format(final ChannelHandlerContext ctx, final String eventName, final ByteBuf arg) throws IOException {
+        final int readableBytes = arg.readableBytes();
+        if (readableBytes == 0) {
+            return super.format(ctx, eventName, arg);
+        } else if (readableBytes >= 2) {
+            final StringBuilder sb = new StringBuilder();
+            sb.append(ctx.channel().toString());
+            final int offset = arg.readerIndex();
+            // this might be an ES message, check the header
+            if (arg.getByte(offset) == (byte) 'E' && arg.getByte(offset + 1) == (byte) 'S') {
+                if (readableBytes == TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE) {
+                    final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET);
+                    if (length == TcpTransport.PING_DATA_SIZE) {
+                        sb.append(" [ping]").append(' ').append(eventName).append(": ").append(readableBytes).append('B');
+                        return sb.toString();
+                    }
+                }
+                else if (readableBytes >= TcpHeader.HEADER_SIZE) {
+                    // we are going to try to decode this as an ES message
+                    final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET);
+                    final long requestId = arg.getLong(offset + REQUEST_ID_OFFSET);
+                    final byte status = arg.getByte(offset + STATUS_OFFSET);
+                    final boolean isRequest = TransportStatus.isRequest(status);
+                    final String type = isRequest ? "request" : "response";
+                    final String version = Version.fromId(arg.getInt(offset + VERSION_ID_OFFSET)).toString();
+                    sb.append(" [length: ").append(length);
+                    sb.append(", request id: ").append(requestId);
+                    sb.append(", type: ").append(type);
+                    sb.append(", version: ").append(version);
+                    if (isRequest) {
+                        // it looks like an ES request, try to decode the action
+                        final int remaining = readableBytes - ACTION_OFFSET;
+                        final ByteBuf slice = arg.slice(offset + ACTION_OFFSET, remaining);
+                        // the stream might be compressed
+                        try (StreamInput in = in(status, slice, remaining)) {
+                            // the first bytes in the message is the context headers
+                            try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
+                                context.readHeaders(in);
+                            }
+                            // now we can decode the action name
+                            sb.append(", action: ").append(in.readString());
+                        }
+                    }
+                    sb.append(']');
+                    sb.append(' ').append(eventName).append(": ").append(readableBytes).append('B');
+                    return sb.toString();
+                }
+            }
+        }
+        // we could not decode this as an ES message, use the default formatting
+        return super.format(ctx, eventName, arg);
+    }
+
+    private StreamInput in(final Byte status, final ByteBuf slice, final int remaining) throws IOException {
+        final ByteBufStreamInput in = new ByteBufStreamInput(slice, remaining);
+        if (TransportStatus.isCompress(status)) {
+            final Compressor compressor = CompressorFactory.compressor(Netty4Utils.toBytesReference(slice));
+            return compressor.streamInput(in);
+        } else {
+            return in;
+        }
+    }
+
+}

+ 2 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

@@ -408,6 +408,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
 
         @Override
         protected void initChannel(Channel ch) throws Exception {
+            ch.pipeline().addLast("logging", new ESLoggingHandler());
             ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
             // using a dot as a prefix means this cannot come from any settings parsed
             ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
@@ -431,6 +432,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
 
         @Override
         protected void initChannel(Channel ch) throws Exception {
+            ch.pipeline().addLast("logging", new ESLoggingHandler());
             ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
             ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
             ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));

+ 83 - 0
modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java

@@ -0,0 +1,83 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport.netty4;
+
+import org.apache.logging.log4j.Level;
+import org.elasticsearch.ESNetty4IntegTestCase;
+import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.MockLogAppender;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+
+@ESIntegTestCase.ClusterScope(numDataNodes = 2)
+@TestLogging(value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace")
+public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
+
+    private MockLogAppender appender;
+
+    public void setUp() throws Exception {
+        super.setUp();
+        appender = new MockLogAppender();
+        Loggers.addAppender(Loggers.getLogger(ESLoggingHandler.class), appender);
+        appender.start();
+    }
+
+    public void tearDown() throws Exception {
+        Loggers.removeAppender(Loggers.getLogger(ESLoggingHandler.class), appender);
+        appender.stop();
+        super.tearDown();
+    }
+
+    public void testLoggingHandler() throws IllegalAccessException {
+        final String writePattern =
+                ".*\\[length: \\d+" +
+                        ", request id: \\d+" +
+                        ", type: request" +
+                        ", version: .*" +
+                        ", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" +
+                        " WRITE: \\d+B";
+        final MockLogAppender.LoggingExpectation writeExpectation =
+                new MockLogAppender.PatternSeenEventExcpectation(
+                        "hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, writePattern);
+
+        final MockLogAppender.LoggingExpectation flushExpectation =
+                new MockLogAppender.SeenEventExpectation("flush", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, "*FLUSH*");
+
+        final String readPattern =
+                ".*\\[length: \\d+" +
+                        ", request id: \\d+" +
+                        ", type: request" +
+                        ", version: .*" +
+                        ", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" +
+                        " READ: \\d+B";
+
+        final MockLogAppender.LoggingExpectation readExpectation =
+                new MockLogAppender.PatternSeenEventExcpectation(
+                        "hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, readPattern);
+
+        appender.addExpectation(writeExpectation);
+        appender.addExpectation(flushExpectation);
+        appender.addExpectation(readExpectation);
+        client().admin().cluster().nodesHotThreads(new NodesHotThreadsRequest()).actionGet();
+        appender.assertAllExpectationsMatched();
+    }
+
+}

+ 33 - 0
test/framework/src/main/java/org/elasticsearch/test/MockLogAppender.java

@@ -26,9 +26,11 @@ import org.elasticsearch.common.regex.Regex;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test appender that can be used to verify that certain events were logged correctly
@@ -122,6 +124,37 @@ public class MockLogAppender extends AbstractAppender {
         }
     }
 
+    public static class PatternSeenEventExcpectation implements LoggingExpectation {
+
+        protected final String name;
+        protected final String logger;
+        protected final Level level;
+        protected final String pattern;
+        volatile boolean saw;
+
+        public PatternSeenEventExcpectation(String name, String logger, Level level, String pattern) {
+            this.name = name;
+            this.logger = logger;
+            this.level = level;
+            this.pattern = pattern;
+        }
+
+        @Override
+        public void match(LogEvent event) {
+            if (event.getLevel().equals(level) && event.getLoggerName().equals(logger)) {
+                if (Pattern.matches(pattern, event.getMessage().getFormattedMessage())) {
+                    saw = true;
+                }
+            }
+        }
+
+        @Override
+        public void assertMatched() {
+            assertThat(name, saw, equalTo(true));
+        }
+
+    }
+
     private static String getLoggerName(String name) {
         if (name.startsWith("org.elasticsearch.")) {
             name = name.substring("org.elasticsearch.".length());