Browse Source

Mock Transport: Allow to simulate network failures
An infrastructure that allows to simulate different network topologies failures, including 2 basic ones in failure to send requests, and unresponsive nodes
closes #5631

Shay Banon 11 years ago
parent
commit
cc4cae3ba0

+ 7 - 1
src/main/java/org/elasticsearch/transport/TransportModule.java

@@ -37,6 +37,7 @@ public class TransportModule extends AbstractModule implements SpawnModules {
     private final Settings settings;
     
     public static final String TRANSPORT_TYPE_KEY = "transport.type";
+    public static final String TRANSPORT_SERVICE_TYPE_KEY = "transport.service.type";
 
     public TransportModule(Settings settings) {
         this.settings = settings;
@@ -55,6 +56,11 @@ public class TransportModule extends AbstractModule implements SpawnModules {
 
     @Override
     protected void configure() {
-        bind(TransportService.class).asEagerSingleton();
+        Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService");
+        if (!TransportService.class.equals(transportService)) {
+            bind(TransportService.class).to(transportService).asEagerSingleton();
+        } else {
+            bind(TransportService.class).asEagerSingleton();
+        }
     }
 }

+ 2 - 3
src/main/java/org/elasticsearch/transport/TransportService.java

@@ -49,9 +49,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
  */
 public class TransportService extends AbstractLifecycleComponent<TransportService> {
 
-    private final Transport transport;
-
-    private final ThreadPool threadPool;
+    protected final Transport transport;
+    protected final ThreadPool threadPool;
 
     volatile ImmutableMap<String, TransportRequestHandler> serverHandlers = ImmutableMap.of();
     final Object serverHandlersMutex = new Object();

+ 9 - 0
src/test/java/org/elasticsearch/test/TestCluster.java

@@ -59,6 +59,7 @@ import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
 import org.elasticsearch.test.engine.MockEngineModule;
 import org.elasticsearch.test.store.MockFSIndexStoreModule;
 import org.elasticsearch.test.transport.AssertingLocalTransportModule;
+import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportModule;
@@ -233,6 +234,7 @@ public final class TestCluster extends ImmutableTestCluster {
             builder.put(IndexEngineModule.EngineSettings.ENGINE_TYPE, MockEngineModule.class.getName());
             builder.put(PageCacheRecyclerModule.CACHE_IMPL, MockPageCacheRecyclerModule.class.getName());
             builder.put(BigArraysModule.IMPL, MockBigArraysModule.class.getName());
+            builder.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName());
         }
         if (isLocalTransportConfigured()) {
             builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransportModule.class.getName());
@@ -667,6 +669,13 @@ public final class TestCluster extends ImmutableTestCluster {
         if (wipeData) {
             wipeDataDirectories();
         }
+        // clear all rules for mock transport services
+        for (NodeAndClient nodeAndClient : nodes.values()) {
+            TransportService transportService = nodeAndClient.node.injector().getInstance(TransportService.class);
+            if (transportService instanceof MockTransportService) {
+                ((MockTransportService) transportService).clearAllRules();
+            }
+        }
         if (nextNodeId.get() == sharedNodesSeeds.length && nodes.size() == sharedNodesSeeds.length) {
             logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length);
             return;

+ 247 - 0
src/test/java/org/elasticsearch/test/transport/MockTransportService.java

@@ -0,0 +1,247 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.test.transport;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.component.LifecycleListener;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.BoundTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.*;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A mock transport service that allows to simulate different network topology failures.
+ */
+public class MockTransportService extends TransportService {
+
+    @Inject
+    public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
+        super(settings, new LookupTestTransport(transport), threadPool);
+    }
+
+    /**
+     * Clears all the registered rules.
+     */
+    public void clearAllRules() {
+        ((LookupTestTransport) transport).transports.clear();
+    }
+
+    /**
+     * Clears the rule associated with the provided node.
+     */
+    public void clearRule(DiscoveryNode node) {
+        ((LookupTestTransport) transport).transports.remove(node);
+    }
+
+    /**
+     * Adds a rule that will cause every send request to fail, and each new connect since the rule
+     * is added to fail as well.
+     */
+    public void addFailToSendNoConnectRule(DiscoveryNode node) {
+        ((LookupTestTransport) transport).transports.put(node, new DelegateTransport(transport) {
+            @Override
+            public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
+                throw new ConnectTransportException(node, "DISCONNECT: simulated");
+            }
+
+            @Override
+            public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+                throw new ConnectTransportException(node, "DISCONNECT: simulated");
+            }
+
+            @Override
+            public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
+                throw new ConnectTransportException(node, "DISCONNECT: simulated");
+            }
+        });
+    }
+
+    /**
+     * Adds a rule that will cause ignores each send request, simulating an unresponsive node
+     * and failing to connect once the rule was added.
+     */
+    public void addUnresponsiveRule(DiscoveryNode node) {
+        // TODO add a parameter to delay the connect timeout?
+        ((LookupTestTransport) transport).transports.put(node, new DelegateTransport(transport) {
+            @Override
+            public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
+                throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
+            }
+
+            @Override
+            public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+                throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
+            }
+
+            @Override
+            public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
+                // don't send anything, the receiving node is unresponsive
+            }
+        });
+    }
+
+    /**
+     * A lookup transport that has a list of potential Transport implementations to delegate to for node operations,
+     * if none is registered, then the default one is used.
+     */
+    private static class LookupTestTransport extends DelegateTransport {
+
+        final ConcurrentMap<DiscoveryNode, Transport> transports = ConcurrentCollections.newConcurrentMap();
+
+        LookupTestTransport(Transport transport) {
+            super(transport);
+        }
+
+        private Transport getTransport(DiscoveryNode node) {
+            Transport transport = transports.get(node);
+            if (transport != null) {
+                return transport;
+            }
+            // TODO, if we miss on node by UID, we should have an option to lookup based on address?
+            return this.transport;
+        }
+
+        @Override
+        public boolean nodeConnected(DiscoveryNode node) {
+            return getTransport(node).nodeConnected(node);
+        }
+
+        @Override
+        public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
+            getTransport(node).connectToNode(node);
+        }
+
+        @Override
+        public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+            getTransport(node).connectToNodeLight(node);
+        }
+
+        @Override
+        public void disconnectFromNode(DiscoveryNode node) {
+            getTransport(node).disconnectFromNode(node);
+        }
+
+        @Override
+        public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
+            getTransport(node).sendRequest(node, requestId, action, request, options);
+        }
+    }
+
+    /**
+     * A pure delegate transport.
+     * Can be extracted to a common class if needed in other places in the codebase.
+     */
+    private static class DelegateTransport implements Transport {
+
+        protected final Transport transport;
+
+        DelegateTransport(Transport transport) {
+            this.transport = transport;
+        }
+
+        @Override
+        public void transportServiceAdapter(TransportServiceAdapter service) {
+            transport.transportServiceAdapter(service);
+        }
+
+        @Override
+        public BoundTransportAddress boundAddress() {
+            return transport.boundAddress();
+        }
+
+        @Override
+        public TransportAddress[] addressesFromString(String address) throws Exception {
+            return transport.addressesFromString(address);
+        }
+
+        @Override
+        public boolean addressSupported(Class<? extends TransportAddress> address) {
+            return transport.addressSupported(address);
+        }
+
+        @Override
+        public boolean nodeConnected(DiscoveryNode node) {
+            return transport.nodeConnected(node);
+        }
+
+        @Override
+        public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
+            transport.connectToNode(node);
+        }
+
+        @Override
+        public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+            transport.connectToNodeLight(node);
+        }
+
+        @Override
+        public void disconnectFromNode(DiscoveryNode node) {
+            transport.disconnectFromNode(node);
+        }
+
+        @Override
+        public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
+            transport.sendRequest(node, requestId, action, request, options);
+        }
+
+        @Override
+        public long serverOpen() {
+            return transport.serverOpen();
+        }
+
+        @Override
+        public Lifecycle.State lifecycleState() {
+            return transport.lifecycleState();
+        }
+
+        @Override
+        public void addLifecycleListener(LifecycleListener listener) {
+            transport.addLifecycleListener(listener);
+        }
+
+        @Override
+        public void removeLifecycleListener(LifecycleListener listener) {
+            transport.removeLifecycleListener(listener);
+        }
+
+        @Override
+        public Transport start() throws ElasticsearchException {
+            return transport.start();
+        }
+
+        @Override
+        public Transport stop() throws ElasticsearchException {
+            return transport.stop();
+        }
+
+        @Override
+        public void close() throws ElasticsearchException {
+            transport.close();
+        }
+    }
+}

+ 142 - 3
src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.ElasticsearchTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
@@ -50,13 +51,13 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
 
     protected static final Version version0 = Version.fromId(/*0*/99);
     protected DiscoveryNode nodeA;
-    protected TransportService serviceA;
+    protected MockTransportService serviceA;
 
     protected static final Version version1 = Version.fromId(199);
     protected DiscoveryNode nodeB;
-    protected TransportService serviceB;
+    protected MockTransportService serviceB;
 
-    protected abstract TransportService build(Settings settings, Version version);
+    protected abstract MockTransportService build(Settings settings, Version version);
 
     @Before
     public void setUp() throws Exception {
@@ -872,4 +873,142 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
 
         assertThat(version0Response.value1, equalTo(1));
     }
+
+    @Test
+    public void testMockFailToSendNoConnectRule() {
+        serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessageRequest>() {
+            @Override
+            public StringMessageRequest newInstance() {
+                return new StringMessageRequest();
+            }
+
+            @Override
+            public String executor() {
+                return ThreadPool.Names.GENERIC;
+            }
+
+            @Override
+            public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
+                assertThat("moshe", equalTo(request.message));
+                throw new RuntimeException("bad message !!!");
+            }
+        });
+
+        serviceB.addFailToSendNoConnectRule(nodeA);
+
+        TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
+                new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
+                    @Override
+                    public StringMessageResponse newInstance() {
+                        return new StringMessageResponse();
+                    }
+
+                    @Override
+                    public String executor() {
+                        return ThreadPool.Names.GENERIC;
+                    }
+
+                    @Override
+                    public void handleResponse(StringMessageResponse response) {
+                        assertThat("got response instead of exception", false, equalTo(true));
+                    }
+
+                    @Override
+                    public void handleException(TransportException exp) {
+                        assertThat(exp.getCause().getMessage(), endsWith("DISCONNECT: simulated"));
+                    }
+                });
+
+        try {
+            res.txGet();
+            assertThat("exception should be thrown", false, equalTo(true));
+        } catch (Exception e) {
+            assertThat(e.getCause().getMessage(), endsWith("DISCONNECT: simulated"));
+        }
+
+        try {
+            serviceB.connectToNode(nodeA);
+            assertThat("exception should be thrown", false, equalTo(true));
+        } catch (ConnectTransportException e) {
+            // all is well
+        }
+
+        try {
+            serviceB.connectToNodeLight(nodeA);
+            assertThat("exception should be thrown", false, equalTo(true));
+        } catch (ConnectTransportException e) {
+            // all is well
+        }
+
+        serviceA.removeHandler("sayHello");
+    }
+
+    @Test
+    public void testMockUnresponsiveRule() {
+        serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessageRequest>() {
+            @Override
+            public StringMessageRequest newInstance() {
+                return new StringMessageRequest();
+            }
+
+            @Override
+            public String executor() {
+                return ThreadPool.Names.GENERIC;
+            }
+
+            @Override
+            public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
+                assertThat("moshe", equalTo(request.message));
+                throw new RuntimeException("bad message !!!");
+            }
+        });
+
+        serviceB.addUnresponsiveRule(nodeA);
+
+        TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
+                new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
+                    @Override
+                    public StringMessageResponse newInstance() {
+                        return new StringMessageResponse();
+                    }
+
+                    @Override
+                    public String executor() {
+                        return ThreadPool.Names.GENERIC;
+                    }
+
+                    @Override
+                    public void handleResponse(StringMessageResponse response) {
+                        assertThat("got response instead of exception", false, equalTo(true));
+                    }
+
+                    @Override
+                    public void handleException(TransportException exp) {
+                        assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
+                    }
+                });
+
+        try {
+            res.txGet();
+            assertThat("exception should be thrown", false, equalTo(true));
+        } catch (Exception e) {
+            assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
+        }
+
+        try {
+            serviceB.connectToNode(nodeA);
+            assertThat("exception should be thrown", false, equalTo(true));
+        } catch (ConnectTransportException e) {
+            // all is well
+        }
+
+        try {
+            serviceB.connectToNodeLight(nodeA);
+            assertThat("exception should be thrown", false, equalTo(true));
+        } catch (ConnectTransportException e) {
+            // all is well
+        }
+
+        serviceA.removeHandler("sayHello");
+    }
 }

+ 6 - 2
src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java

@@ -20,14 +20,18 @@
 package org.elasticsearch.transport.local;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.AbstractSimpleTransportTests;
 import org.elasticsearch.transport.TransportService;
 
 public class SimpleLocalTransportTests extends AbstractSimpleTransportTests {
 
     @Override
-    protected TransportService build(Settings settings, Version version) {
-        return new TransportService(new LocalTransport(settings, threadPool, version), threadPool).start();
+    protected MockTransportService build(Settings settings, Version version) {
+        MockTransportService transportService = new MockTransportService(ImmutableSettings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool);
+        transportService.start();
+        return transportService;
     }
 }

+ 5 - 3
src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java

@@ -25,19 +25,21 @@ import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.AbstractSimpleTransportTests;
 import org.elasticsearch.transport.ConnectTransportException;
-import org.elasticsearch.transport.TransportService;
 import org.junit.Test;
 
 public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
 
     @Override
-    protected TransportService build(Settings settings, Version version) {
+    protected MockTransportService build(Settings settings, Version version) {
         int startPort = 11000 + randomIntBetween(0, 255);
         int endPort = startPort + 10;
         settings = ImmutableSettings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build();
-        return new TransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool).start();
+        MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool);
+        transportService.start();
+        return transportService;
     }
 
     @Test