Browse Source

ESIndexLevelReplicationTestCase: Make it easier to add new TRA-based actions (#20708)

Right now our unit tests in that area only simulate indexing single documents. As we go forward it should be easy
to add other actions, like delete & bulk indexing. This commit extracts the common parts of the current indexing
logic to a based class make it easier to extend.
Boaz Leskes 9 years ago
parent
commit
615928e8cd

+ 1 - 1
core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

@@ -213,7 +213,7 @@ public abstract class TransportWriteAction<
      * callback used by {@link AsyncAfterWriteAction} to notify that all post
      * process actions have been executed
      */
-    private interface RespondingWriteResult {
+    interface RespondingWriteResult {
         /**
          * Called on successful processing of all post write actions
          * @param forcedRefresh <code>true</code> iff this write has caused a refresh

+ 55 - 0
core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java

@@ -0,0 +1,55 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.support.replication;
+
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.translog.Translog;
+
+import java.util.concurrent.CountDownLatch;
+
+public abstract class TransportWriteActionTestHelper {
+
+
+    public static void performPostWriteActions(final IndexShard indexShard,
+                                              final WriteRequest<?> request,
+                                              @Nullable final Translog.Location location,
+                                              final Logger logger) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        TransportWriteAction.RespondingWriteResult writerResult = new TransportWriteAction.RespondingWriteResult() {
+            @Override
+            public void onSuccess(boolean forcedRefresh) {
+                latch.countDown();
+            }
+
+            @Override
+            public void onFailure(Exception ex) {
+                throw new AssertionError(ex);
+            }
+        };
+        new TransportWriteAction.AsyncAfterWriteAction(indexShard, request, location, writerResult, logger).run();
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new AssertionError(e);
+        }
+    }
+}

+ 130 - 93
core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -28,8 +28,10 @@ import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.index.TransportIndexAction;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.replication.ReplicationOperation;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.TransportWriteAction;
+import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -39,6 +41,7 @@ import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
@@ -80,7 +83,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName())
             .settings(settings)
             .primaryTerm(0, 1);
-        for (Map.Entry<String, String> typeMapping: indexMapping.entrySet()) {
+        for (Map.Entry<String, String> typeMapping : indexMapping.entrySet()) {
             metaData.putMapping(typeMapping.getKey(), typeMapping.getValue());
         }
         return new ReplicationGroup(metaData.build());
@@ -129,10 +132,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         }
 
         public IndexResponse index(IndexRequest indexRequest) throws Exception {
-            PlainActionFuture<IndexingResult> listener = new PlainActionFuture<>();
-            IndexingOp op = new IndexingOp(indexRequest, listener, this);
-            op.execute();
-            return listener.get().finalResponse;
+            PlainActionFuture<IndexResponse> listener = new PlainActionFuture<>();
+            new IndexingAction(indexRequest, listener, this).execute();
+            return listener.get();
         }
 
         public synchronized void startAll() throws IOException {
@@ -146,7 +148,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         }
 
         public synchronized IndexShard addReplica() throws IOException {
-            final IndexShard replica = newShard(shardId, false,"s" + replicaId.incrementAndGet(), indexMetaData, null);
+            final IndexShard replica = newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, null);
             replicas.add(replica);
             return replica;
         }
@@ -222,7 +224,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
 
         @Override
         public Iterator<IndexShard> iterator() {
-            return Iterators.<IndexShard>concat(replicas.iterator(), Collections.singleton(primary).iterator());
+            return Iterators.concat(replicas.iterator(), Collections.singleton(primary).iterator());
         }
 
         public IndexShard getPrimary() {
@@ -230,116 +232,151 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         }
     }
 
-    class IndexingOp extends ReplicationOperation<IndexRequest, IndexRequest, IndexingResult> {
-
+    abstract class ReplicationAction<Request extends ReplicationRequest<Request>, ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
+        Response extends ReplicationResponse> {
+        private final Request request;
+        private ActionListener<Response> listener;
         private final ReplicationGroup replicationGroup;
+        private final String opType;
+
+        public ReplicationAction(Request request, ActionListener<Response> listener,
+                                 ReplicationGroup group, String opType) {
+            this.request = request;
+            this.listener = listener;
+            this.replicationGroup = group;
+            this.opType = opType;
+        }
+
+        public void execute() throws Exception {
+            new ReplicationOperation<Request, ReplicaRequest, PrimaryResult>(request, new PrimaryRef(),
+                new ActionListener<PrimaryResult>() {
+                    @Override
+                    public void onResponse(PrimaryResult result) {
+                        result.respond(listener);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        listener.onFailure(e);
+                    }
+                }, true, new ReplicasRef(), () -> null, logger, opType) {
+                @Override
+                protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
+                    return replicationGroup.shardRoutings();
+                }
+
+                @Override
+                protected String checkActiveShardCount() {
+                    return null;
+                }
+
+                @Override
+                protected Set<String> getInSyncAllocationIds(ShardId shardId, ClusterState clusterState) {
+                    return replicationGroup.shardRoutings().stream().filter(ShardRouting::active).map(r -> r.allocationId().getId())
+                        .collect(Collectors.toSet());
+                }
+            }.execute();
+        }
+
+        protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception;
+
+        protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica);
+
+        class PrimaryRef implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult> {
+
+            @Override
+            public ShardRouting routingEntry() {
+                return replicationGroup.primary.routingEntry();
+            }
 
-        public IndexingOp(IndexRequest request, ActionListener<IndexingResult> listener, ReplicationGroup replicationGroup) {
-            super(request, new PrimaryRef(replicationGroup), listener, true, new ReplicasRef(replicationGroup),
-                () -> null, logger, "indexing");
-            this.replicationGroup = replicationGroup;
-            request.process(null, true, request.index());
-        }
-
-        @Override
-        protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
-            return replicationGroup.shardRoutings();
-        }
-
-        @Override
-        protected Set<String> getInSyncAllocationIds(ShardId shardId, ClusterState clusterState) {
-            return replicationGroup.shardRoutings().stream().filter(ShardRouting::active)
-                .map(shr -> shr.allocationId().getId()).collect(Collectors.toSet());
-        }
-
-        @Override
-        protected String checkActiveShardCount() {
-            return null;
-        }
-    }
-
-    private static class PrimaryRef implements ReplicationOperation.Primary<IndexRequest, IndexRequest, IndexingResult> {
-        final IndexShard primary;
+            @Override
+            public void failShard(String message, Exception exception) {
+                throw new UnsupportedOperationException();
+            }
 
-        private PrimaryRef(ReplicationGroup replicationGroup) {
-            this.primary = replicationGroup.primary;
+            @Override
+            public PrimaryResult perform(Request request) throws Exception {
+                PrimaryResult response = performOnPrimary(replicationGroup.primary, request);
+                response.replicaRequest().primaryTerm(replicationGroup.primary.getPrimaryTerm());
+                return response;
+            }
         }
 
-        @Override
-        public ShardRouting routingEntry() {
-            return primary.routingEntry();
-        }
+        class ReplicasRef implements ReplicationOperation.Replicas<ReplicaRequest> {
+
+            @Override
+            public void performOn(
+                ShardRouting replicaRouting,
+                ReplicaRequest request,
+                ActionListener<TransportResponse.Empty> listener) {
+                try {
+                    IndexShard replica = replicationGroup.replicas.stream()
+                        .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
+                    performOnReplica(request, replica);
+                    listener.onResponse(TransportResponse.Empty.INSTANCE);
+                } catch (Exception e) {
+                    listener.onFailure(e);
+                }
+            }
 
-        @Override
-        public void failShard(String message, Exception exception) {
-            throw new UnsupportedOperationException();
-        }
+            @Override
+            public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
+                                  Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
+                throw new UnsupportedOperationException();
+            }
 
-        @Override
-        public IndexingResult perform(IndexRequest request) throws Exception {
-            TransportWriteAction.WriteResult<IndexResponse> result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary,
-                null);
-            request.primaryTerm(primary.getPrimaryTerm());
-            return new IndexingResult(request, result.getResponse());
+            @Override
+            public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
+                                             Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
+                throw new UnsupportedOperationException();
+            }
         }
 
-    }
-
-    private static class ReplicasRef implements ReplicationOperation.Replicas<IndexRequest> {
-        private final ReplicationGroup replicationGroup;
+        class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
+            final ReplicaRequest replicaRequest;
+            final Response finalResponse;
 
-        private ReplicasRef(ReplicationGroup replicationGroup) {
-            this.replicationGroup = replicationGroup;
-        }
+            public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) {
+                this.replicaRequest = replicaRequest;
+                this.finalResponse = finalResponse;
+            }
 
-        @Override
-        public void performOn(ShardRouting replicaRouting, IndexRequest request, ActionListener<TransportResponse.Empty> listener) {
-            try {
-                IndexShard replica = replicationGroup.replicas.stream()
-                    .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
-                TransportIndexAction.executeIndexRequestOnReplica(request, replica);
-                listener.onResponse(TransportResponse.Empty.INSTANCE);
-            } catch (Exception t) {
-                listener.onFailure(t);
+            @Override
+            public ReplicaRequest replicaRequest() {
+                return replicaRequest;
             }
-        }
 
-        @Override
-        public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
-                              Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
-            throw new UnsupportedOperationException();
-        }
+            @Override
+            public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
+                finalResponse.setShardInfo(shardInfo);
+            }
 
-        @Override
-        public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
-                                         Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
-            throw new UnsupportedOperationException();
+            public void respond(ActionListener<Response> listener) {
+                listener.onResponse(finalResponse);
+            }
         }
     }
 
+    class IndexingAction extends ReplicationAction<IndexRequest, IndexRequest, IndexResponse> {
 
-    private static class IndexingResult implements ReplicationOperation.PrimaryResult<IndexRequest> {
-        final IndexRequest replicaRequest;
-        final IndexResponse finalResponse;
-
-        public IndexingResult(IndexRequest replicaRequest, IndexResponse finalResponse) {
-            this.replicaRequest = replicaRequest;
-            this.finalResponse = finalResponse;
+        public IndexingAction(IndexRequest request, ActionListener<IndexResponse> listener, ReplicationGroup replicationGroup) {
+            super(request, listener, replicationGroup, "indexing");
+            request.process(null, true, request.index());
         }
 
         @Override
-        public IndexRequest replicaRequest() {
-            return replicaRequest;
+        protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
+            TransportWriteAction.WriteResult<IndexResponse> result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary,
+                null);
+            request.primaryTerm(primary.getPrimaryTerm());
+            TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.getLocation(), logger);
+            return new PrimaryResult(request, result.getResponse());
         }
 
         @Override
-        public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
-            finalResponse.setShardInfo(shardInfo);
-        }
-
-        public void respond(ActionListener<IndexResponse> listener) {
-            listener.onResponse(finalResponse);
+        protected void performOnReplica(IndexRequest request, IndexShard replica) {
+            Engine.Index index = TransportIndexAction.executeIndexRequestOnReplica(request, replica);
+            TransportWriteActionTestHelper.performPostWriteActions(replica, request, index.getTranslogLocation(), logger);
         }
     }
-
 }