Browse Source

better handling of non supported commands / opcodes

kimchy 15 years ago
parent
commit
8223418b04

+ 1 - 1
.idea/modules/plugin-transport-memcached.iml

@@ -16,7 +16,7 @@
     <orderEntry type="module-library" scope="TEST">
       <library name="spymemcached">
         <CLASSES>
-          <root url="jar://$GRADLE_REPOSITORY$/spy/memcached/jars/memcached-2.4.2.jar!/" />
+          <root url="jar://$GRADLE_REPOSITORY$/spy/memcached/jars/memcached-2.5.jar!/" />
         </CLASSES>
         <JAVADOC />
         <SOURCES />

+ 1 - 1
plugins/transport/memcached/build.gradle

@@ -39,7 +39,7 @@ dependencies {
     testCompile project(':test-testng')
     testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
     testCompile 'org.hamcrest:hamcrest-all:1.1'
-    testCompile 'spy:memcached:2.4.2'
+    testCompile 'spy:memcached:2.5'
 }
 
 test {

+ 10 - 0
plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java

@@ -54,6 +54,8 @@ public class MemcachedRestRequest extends AbstractRestRequest {
 
     private int opaque;
 
+    private boolean quiet;
+
     public MemcachedRestRequest(Method method, String uri, byte[] uriBytes, int dataSize, boolean binary) {
         this.method = method;
         this.uri = uri;
@@ -98,6 +100,14 @@ public class MemcachedRestRequest extends AbstractRestRequest {
         this.opaque = opaque;
     }
 
+    public boolean isQuiet() {
+        return quiet;
+    }
+
+    public void setQuiet(boolean quiet) {
+        this.quiet = quiet;
+    }
+
     public int getDataSize() {
         return dataSize;
     }

+ 52 - 20
plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDecoder.java

@@ -20,7 +20,9 @@
 package org.elasticsearch.memcached.netty;
 
 import org.elasticsearch.common.Unicode;
+import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.netty.buffer.ChannelBuffer;
+import org.elasticsearch.common.netty.buffer.ChannelBuffers;
 import org.elasticsearch.common.netty.channel.Channel;
 import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
 import org.elasticsearch.common.netty.channel.ExceptionEvent;
@@ -36,6 +38,8 @@ import java.util.regex.Pattern;
  */
 public class MemcachedDecoder extends FrameDecoder {
 
+    private final ESLogger logger;
+
     private final Pattern lineSplit = Pattern.compile(" +");
 
     public static final byte CR = 13;
@@ -46,8 +50,9 @@ public class MemcachedDecoder extends FrameDecoder {
 
     private volatile MemcachedRestRequest request;
 
-    public MemcachedDecoder() {
+    public MemcachedDecoder(ESLogger logger) {
         super(false);
+        this.logger = logger;
     }
 
     @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
@@ -78,30 +83,53 @@ public class MemcachedDecoder extends FrameDecoder {
 
                 buffer.skipBytes(extraLength); // get extras, can be empty
 
-                if (keyLength != 0) {
+                if (opcode == 0x00) { // GET
                     byte[] key = new byte[keyLength];
                     buffer.readBytes(key);
                     String uri = Unicode.fromBytes(key);
-                    if (opcode == 0x00) { // GET
-                        request = new MemcachedRestRequest(RestRequest.Method.GET, uri, key, -1, true);
-                        request.setOpaque(opaque);
-                        return request;
-                    } else if (opcode == 0x04) { // DELETE
-                        request = new MemcachedRestRequest(RestRequest.Method.DELETE, uri, key, -1, true);
-                        request.setOpaque(opaque);
-                        return request;
-                    } else if (opcode == 0x01) { // SET
-                        // the remainder of the message -- that is, totalLength - (keyLength + extraLength) should be the payload
-                        int size = totalBodyLength - keyLength - extraLength;
-                        request = new MemcachedRestRequest(RestRequest.Method.POST, uri, key, size, true);
-                        request.setOpaque(opaque);
-                        byte[] data = new byte[size];
-                        buffer.readBytes(data, 0, size);
-                        request.setData(data);
-                        return request;
-                    }
+                    request = new MemcachedRestRequest(RestRequest.Method.GET, uri, key, -1, true);
+                    request.setOpaque(opaque);
+                    return request;
+                } else if (opcode == 0x04) { // DELETE
+                    byte[] key = new byte[keyLength];
+                    buffer.readBytes(key);
+                    String uri = Unicode.fromBytes(key);
+                    request = new MemcachedRestRequest(RestRequest.Method.DELETE, uri, key, -1, true);
+                    request.setOpaque(opaque);
+                    return request;
+                } else if (opcode == 0x01/* || opcode == 0x11*/) { // SET
+                    byte[] key = new byte[keyLength];
+                    buffer.readBytes(key);
+                    String uri = Unicode.fromBytes(key);
+                    // the remainder of the message -- that is, totalLength - (keyLength + extraLength) should be the payload
+                    int size = totalBodyLength - keyLength - extraLength;
+                    request = new MemcachedRestRequest(RestRequest.Method.POST, uri, key, size, true);
+                    request.setOpaque(opaque);
+                    byte[] data = new byte[size];
+                    buffer.readBytes(data, 0, size);
+                    request.setData(data);
+                    request.setQuiet(opcode == 0x11);
+                    return request;
+                } else if (opcode == 0x0A || opcode == 0x10) { // NOOP or STATS
+                    // TODO once we support setQ we need to wait for them to flush
+                    ChannelBuffer writeBuffer = ChannelBuffers.dynamicBuffer(24);
+                    writeBuffer.writeByte(0x81);  // magic
+                    writeBuffer.writeByte(opcode); // opcode
+                    writeBuffer.writeShort(0); // key length
+                    writeBuffer.writeByte(0); // extra length = flags + expiry
+                    writeBuffer.writeByte(0); // data type unused
+                    writeBuffer.writeShort(0x0000); // OK
+                    writeBuffer.writeInt(0); // data length
+                    writeBuffer.writeInt(opaque); // opaque
+                    writeBuffer.writeLong(0); // cas
+                    channel.write(writeBuffer);
+                    return MemcachedDispatcher.IGNORE_REQUEST;
                 } else if (opcode == 0x07) { // QUIT
                     channel.disconnect();
+                } else {
+                    logger.error("Unsupported opcode [0x{}], ignoring and closing connection", Integer.toHexString(opcode));
+                    channel.disconnect();
+                    return null;
                 }
             } else {
                 buffer.resetReaderIndex(); // reset to get to the first byte
@@ -152,6 +180,10 @@ public class MemcachedDecoder extends FrameDecoder {
                     buffer.markReaderIndex();
                 } else if ("quit".equals(cmd)) {
                     channel.disconnect();
+                } else {
+                    logger.error("Unsupported command [{}], ignoring and closing connection", cmd);
+                    channel.disconnect();
+                    return null;
                 }
             }
         } else {

+ 6 - 0
plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java

@@ -33,6 +33,8 @@ import static org.elasticsearch.rest.RestResponse.Status.*;
  */
 public class MemcachedDispatcher extends SimpleChannelUpstreamHandler {
 
+    public static final Object IGNORE_REQUEST = new Object();
+
     private final RestController restController;
 
     public MemcachedDispatcher(RestController restController) {
@@ -40,6 +42,10 @@ public class MemcachedDispatcher extends SimpleChannelUpstreamHandler {
     }
 
     @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+        if (e.getMessage() == IGNORE_REQUEST) {
+            super.messageReceived(ctx, e);
+            return;
+        }
         MemcachedRestRequest request = (MemcachedRestRequest) e.getMessage();
         MemcachedRestChannel channel = new MemcachedRestChannel(ctx.getChannel(), request);
 

+ 25 - 17
plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java

@@ -60,27 +60,35 @@ public class MemcachedRestChannel implements RestChannel {
 
     @Override public void sendResponse(RestResponse response) {
         if (request.isBinary()) {
+            if (request.isQuiet() && response.status().getStatus() < 500) {
+                // nothing to send and all is well
+                return;
+            }
             try {
                 ChannelBuffer writeBuffer = ChannelBuffers.dynamicBuffer(24 + request.getUriBytes().length + response.contentLength() + 12);
-                writeBuffer.writeByte((byte) 0x81);  // magic
+                writeBuffer.writeByte(0x81);  // magic
                 if (request.method() == RestRequest.Method.GET) {
-                    writeBuffer.writeByte((byte) 0x00); // opcode
+                    writeBuffer.writeByte(0x00); // opcode
                 } else if (request.method() == RestRequest.Method.POST) {
-                    writeBuffer.writeByte((byte) 0x01); // opcode
+                    if (request.isQuiet()) {
+                        writeBuffer.writeByte(0x11); // opcode
+                    } else {
+                        writeBuffer.writeByte(0x01); // opcode
+                    }
                 } else if (request.method() == RestRequest.Method.DELETE) {
-                    writeBuffer.writeByte((byte) 0x04); // opcode
+                    writeBuffer.writeByte(0x04); // opcode
                 }
                 short keyLength = request.method() == RestRequest.Method.GET ? (short) request.getUriBytes().length : 0;
                 writeBuffer.writeShort(keyLength);
                 int extrasLength = request.method() == RestRequest.Method.GET ? 4 : 0;
-                writeBuffer.writeByte((byte) extrasLength); // extra length = flags + expiry
-                writeBuffer.writeByte((byte) 0); // data type unused
+                writeBuffer.writeByte(extrasLength); // extra length = flags + expiry
+                writeBuffer.writeByte(0); // data type unused
 
                 if (response.status().getStatus() >= 500) {
                     // TODO should we use this?
-                    writeBuffer.writeShort((short) 0x0A); // status code
+                    writeBuffer.writeShort(0x0A); // status code
                 } else {
-                    writeBuffer.writeShort((short) 0x0000); // OK
+                    writeBuffer.writeShort(0x0000); // OK
                 }
 
                 int dataLength = request.method() == RestRequest.Method.GET ? response.contentLength() : 0;
@@ -89,8 +97,8 @@ public class MemcachedRestChannel implements RestChannel {
                 writeBuffer.writeLong(0); // cas
 
                 if (extrasLength > 0) {
-                    writeBuffer.writeShort((short) 0);
-                    writeBuffer.writeShort((short) 0);
+                    writeBuffer.writeShort(0);
+                    writeBuffer.writeShort(0);
                 }
                 if (keyLength > 0) {
                     writeBuffer.writeBytes(request.getUriBytes());
@@ -116,15 +124,15 @@ public class MemcachedRestChannel implements RestChannel {
                         ChannelBuffer writeBuffer = ChannelBuffers.dynamicBuffer(response.contentLength() + 512);
                         writeBuffer.writeBytes(VALUE.duplicate());
                         writeBuffer.writeBytes(Unicode.fromStringAsBytes(request.uri()));
-                        writeBuffer.writeByte((byte) ' ');
-                        writeBuffer.writeByte((byte) '0');
-                        writeBuffer.writeByte((byte) ' ');
+                        writeBuffer.writeByte(' ');
+                        writeBuffer.writeByte('0');
+                        writeBuffer.writeByte(' ');
                         writeBuffer.writeBytes(Bytes.itoa(response.contentLength()));
-                        writeBuffer.writeByte((byte) '\r');
-                        writeBuffer.writeByte((byte) '\n');
+                        writeBuffer.writeByte('\r');
+                        writeBuffer.writeByte('\n');
                         writeBuffer.writeBytes(response.content(), 0, response.contentLength());
-                        writeBuffer.writeByte((byte) '\r');
-                        writeBuffer.writeByte((byte) '\n');
+                        writeBuffer.writeByte('\r');
+                        writeBuffer.writeByte('\n');
                         writeBuffer.writeBytes(END.duplicate());
                         channel.write(writeBuffer);
                     } catch (Exception e) {

+ 1 - 1
plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java

@@ -128,7 +128,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
             @Override public ChannelPipeline getPipeline() throws Exception {
                 ChannelPipeline pipeline = Channels.pipeline();
                 pipeline.addLast("openChannels", serverOpenChannels);
-                pipeline.addLast("decoder", new MemcachedDecoder());
+                pipeline.addLast("decoder", new MemcachedDecoder(logger));
                 pipeline.addLast("dispatcher", new MemcachedDispatcher(restController));
                 return pipeline;
             }

+ 11 - 0
plugins/transport/memcached/src/test/java/org/elasticsearch/memcached/test/AbstractMemcachedActionsTests.java

@@ -60,6 +60,17 @@ public abstract class AbstractMemcachedActionsTests {
     }
 
     @Test public void testSimpleOperations() throws Exception {
+        // TODO seems to use SetQ, which is not really supported yet
+//        List<Future<Boolean>> setResults = Lists.newArrayList();
+//
+//        for (int i = 0; i < 10; i++) {
+//            setResults.add(memcachedClient.set("/test/person/" + i, 0, jsonBuilder().startObject().field("test", "value").endObject().copiedBytes()));
+//        }
+//
+//        for (Future<Boolean> setResult : setResults) {
+//            assertThat(setResult.get(10, TimeUnit.SECONDS), equalTo(true));
+//        }
+
         Future<Boolean> setResult = memcachedClient.set("/test/person/1", 0, jsonBuilder().startObject().field("test", "value").endObject().copiedBytes());
         assertThat(setResult.get(10, TimeUnit.SECONDS), equalTo(true));