浏览代码

Use atomic collections to make sure all of the memory contents are visible from writing to reading thread.

Martijn van Groningen 12 年之前
父节点
当前提交
a7b2b7847a
共有 1 个文件被更改,包括 33 次插入19 次删除
  1. 33 19
      src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java

+ 33 - 19
src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java

@@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.index.engine.DocumentMissingException;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.percolator.PercolatorService;
@@ -42,7 +43,9 @@ import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 
 /**
@@ -72,20 +75,24 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
     protected void doExecute(final MultiPercolateRequest request, final ActionListener<MultiPercolateResponse> listener) {
         final ClusterState clusterState = clusterService.state();
         clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
-        @SuppressWarnings("unchecked")
-        final List<Object> percolateRequests = (List) request.requests();
 
-        final TIntArrayList slots = new TIntArrayList();
-        final List<GetRequest> existingDocsRequests = new ArrayList<GetRequest>();
+        final AtomicReferenceArray<Object> percolateRequests = new AtomicReferenceArray<Object>(request.requests().size());
+        TIntArrayList getRequestSlots = new TIntArrayList();
+        List<GetRequest> existingDocsRequests = new ArrayList<GetRequest>();
         for (int i = 0;  i < request.requests().size(); i++) {
             PercolateRequest percolateRequest = request.requests().get(i);
             percolateRequest.startTime = System.currentTimeMillis();
+            percolateRequests.set(i, percolateRequest);
             if (percolateRequest.getRequest() != null) {
                 existingDocsRequests.add(percolateRequest.getRequest());
-                slots.add(i);
+                getRequestSlots.add(i);
             }
         }
 
+        // Can have a mixture of percolate requests. (normal percolate requests & percolate existing doc),
+        // so we need to keep track for what percolate request we had a get request
+        final AtomicIntegerArray percolateRequestSlotsWithGet = new AtomicIntegerArray(getRequestSlots.toArray());
+
         if (!existingDocsRequests.isEmpty()) {
             final MultiGetRequest multiGetRequest = new MultiGetRequest();
             for (GetRequest getRequest : existingDocsRequests) {
@@ -101,7 +108,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
                 public void onResponse(MultiGetResponse multiGetItemResponses) {
                     for (int i = 0; i < multiGetItemResponses.getResponses().length; i++) {
                         MultiGetItemResponse itemResponse = multiGetItemResponses.getResponses()[i];
-                        int slot = slots.get(i);
+                        int slot = percolateRequestSlotsWithGet.get(i);
                         if (!itemResponse.isFailed()) {
                             GetResponse getResponse = itemResponse.getResponse();
                             if (getResponse.isExists()) {
@@ -127,18 +134,21 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
 
     }
 
-    private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final List<Object> percolateRequests,
+    private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final AtomicReferenceArray<Object> percolateRequests,
                                 final ActionListener<MultiPercolateResponse> listener, ClusterState clusterState) {
 
-        final AtomicInteger[] expectedOperationsPerItem = new AtomicInteger[percolateRequests.size()];
+        final AtomicInteger[] expectedOperationsPerItem = new AtomicInteger[percolateRequests.length()];
         final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard = new AtomicReferenceArray<AtomicReferenceArray>(multiPercolateRequest.requests().size());
-        final AtomicArray<Object> reducedResponses = new AtomicArray<Object>(percolateRequests.size());
+        final AtomicArray<Object> reducedResponses = new AtomicArray<Object>(percolateRequests.length());
+
+        // Keep track what slots belong to what shard, in case a request to a shard fails on all copies
+        final ConcurrentMap<ShardId, AtomicIntegerArray> shardToSlots = ConcurrentCollections.newConcurrentMap();
 
         // Resolving concrete indices and routing and grouping the requests by shard
-        final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
-        final Map<ShardId, TIntArrayList> shardToSlots = new HashMap<ShardId, TIntArrayList>();
+        Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
+        Map<ShardId, TIntArrayList> shardToSlotsBuilder = new HashMap<ShardId, TIntArrayList>();
         int expectedResults = 0;
-        for (int i = 0;  i < percolateRequests.size(); i++) {
+        for (int i = 0;  i < percolateRequests.length(); i++) {
             Object element = percolateRequests.get(i);
             assert element != null;
             if (element instanceof PercolateRequest) {
@@ -160,9 +170,9 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
                     }
                     requests.add(new TransportShardMultiPercolateAction.Request.Item(i, new PercolateShardRequest(shardId, percolateRequest)));
 
-                    TIntArrayList items = shardToSlots.get(shardId);
+                    TIntArrayList items = shardToSlotsBuilder.get(shardId);
                     if (items == null) {
-                        shardToSlots.put(shardId, items = new TIntArrayList());
+                        shardToSlotsBuilder.put(shardId, items = new TIntArrayList());
                     }
                     items.add(i);
                 }
@@ -179,6 +189,10 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
             return;
         }
 
+        for (Map.Entry<ShardId, TIntArrayList> entry : shardToSlotsBuilder.entrySet()) {
+            shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray()));
+        }
+
         final AtomicInteger expectedOperations = new AtomicInteger(expectedResults);
         for (Map.Entry<ShardId, TransportShardMultiPercolateAction.Request> entry : requestsByShard.entrySet()) {
             final ShardId shardId = entry.getKey();
@@ -219,8 +233,8 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
                 public void onFailure(Throwable e) {
                     logger.debug("Shard multi percolate failure", e);
                     try {
-                        TIntArrayList slots = shardToSlots.get(shardId);
-                        for (int i = 0; i < slots.size(); i++) {
+                        AtomicIntegerArray slots = shardToSlots.get(shardId);
+                        for (int i = 0; i < slots.length(); i++) {
                             int slot = slots.get(i);
                             AtomicReferenceArray shardResults = responsesByItemAndShard.get(slot);
                             if (shardResults == null) {
@@ -244,7 +258,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
     }
 
     private void reduce(int slot,
-                        List<Object> percolateRequests,
+                        AtomicReferenceArray<Object> percolateRequests,
                         AtomicInteger expectedOperations,
                         AtomicArray<Object> reducedResponses,
                         ActionListener<MultiPercolateResponse> listener,
@@ -253,7 +267,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
         AtomicReferenceArray shardResponses = responsesByItemAndShard.get(slot);
         PercolateResponse reducedResponse = TransportPercolateAction.reduce((PercolateRequest) percolateRequests.get(slot), shardResponses, percolatorService);
         reducedResponses.set(slot, reducedResponse);
-        assert expectedOperations.get() >= 1;
+        assert expectedOperations.get() >= 1 : "slot[" + slot + "] expected options should be >= 1 but is " + expectedOperations.get();
         if (expectedOperations.decrementAndGet() == 0) {
             finish(reducedResponses, listener);
         }
@@ -263,7 +277,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
         MultiPercolateResponse.Item[] finalResponse = new MultiPercolateResponse.Item[reducedResponses.length()];
         for (int i = 0; i < reducedResponses.length(); i++) {
             Object element = reducedResponses.get(i);
-            assert element != null;
+            assert element != null : "Element[" + i + "] shouldn't be null";
             if (element instanceof PercolateResponse) {
                 finalResponse[i] = new MultiPercolateResponse.Item((PercolateResponse) element);
             } else if (element instanceof Throwable) {