1
0
Эх сурвалжийг харах

Restructured the transport action code for multi percolate api.

Martijn van Groningen 12 жил өмнө
parent
commit
eed7f0bdb3

+ 154 - 130
src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java

@@ -112,7 +112,8 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
                         if (!itemResponse.isFailed()) {
                             GetResponse getResponse = itemResponse.getResponse();
                             if (getResponse.isExists()) {
-                                percolateRequests.set(slot, new PercolateRequest((PercolateRequest) percolateRequests.get(slot), getResponse.getSourceAsBytesRef()));
+                                PercolateRequest originalRequest = (PercolateRequest) percolateRequests.get(slot);
+                                percolateRequests.set(slot, new PercolateRequest(originalRequest, getResponse.getSourceAsBytesRef()));
                             } else {
                                 percolateRequests.set(slot, new DocumentMissingException(null, getResponse.getType(), getResponse.getId()));
                             }
@@ -120,7 +121,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
                             percolateRequests.set(slot, itemResponse.getFailure());
                         }
                     }
-                    multiPercolate(request, percolateRequests, listener, clusterState);
+                    new ASyncAction(percolateRequests, listener, clusterState).run();
                 }
 
                 @Override
@@ -129,162 +130,185 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
                 }
             });
         } else {
-            multiPercolate(request, percolateRequests, listener, clusterState);
+            new ASyncAction(percolateRequests, listener, clusterState).run();
         }
 
     }
 
-    private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final AtomicReferenceArray<Object> percolateRequests,
-                                final ActionListener<MultiPercolateResponse> listener, ClusterState clusterState) {
-
-        final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem = new AtomicReferenceArray<AtomicInteger>(percolateRequests.length());
-        final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard = new AtomicReferenceArray<AtomicReferenceArray>(multiPercolateRequest.requests().size());
-        final AtomicArray<Object> reducedResponses = new AtomicArray<Object>(percolateRequests.length());
-
-        // Resolving concrete indices and routing and grouping the requests by shard
-        Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
-        // Keep track what slots belong to what shard, in case a request to a shard fails on all copies
-        Map<ShardId, TIntArrayList> shardToSlotsBuilder = new HashMap<ShardId, TIntArrayList>();
-        int expectedResults = 0;
-        for (int i = 0;  i < percolateRequests.length(); i++) {
-            Object element = percolateRequests.get(i);
-            assert element != null;
-            if (element instanceof PercolateRequest) {
-                PercolateRequest percolateRequest = (PercolateRequest) element;
-                String[] concreteIndices = clusterState.metaData().concreteIndices(percolateRequest.indices(), percolateRequest.ignoreIndices(), true);
-                Map<String, Set<String>> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), multiPercolateRequest.indices());
-                // TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction
-                GroupShardsIterator shards = clusterService.operationRouting().searchShards(
-                        clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference()
-                );
 
-                responsesByItemAndShard.set(i, new AtomicReferenceArray(shards.size()));
-                expectedOperationsPerItem.set(i, new AtomicInteger(shards.size()));
-                for (ShardIterator shard : shards) {
-                    ShardId shardId = shard.shardId();
-                    TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
-                    if (requests == null) {
-                        requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference()));
-                    }
-                    requests.add(new TransportShardMultiPercolateAction.Request.Item(i, new PercolateShardRequest(shardId, percolateRequest)));
+    private class ASyncAction {
+
+        final ActionListener<MultiPercolateResponse> listener;
+        final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard;
+        final AtomicReferenceArray<Object> percolateRequests;
+
+        final AtomicInteger expectedOperations;
+        final AtomicArray<Object> reducedResponses;
+        final ConcurrentMap<ShardId, AtomicIntegerArray> shardToSlots;
+        final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem;
+        final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard;
+
+        ASyncAction(AtomicReferenceArray<Object> percolateRequests, ActionListener<MultiPercolateResponse> listener, ClusterState clusterState) {
+            this.listener = listener;
+            this.percolateRequests = percolateRequests;
+            responsesByItemAndShard = new AtomicReferenceArray<AtomicReferenceArray>(percolateRequests.length());
+            expectedOperationsPerItem = new AtomicReferenceArray<AtomicInteger>(percolateRequests.length());
+            reducedResponses = new AtomicArray<Object>(percolateRequests.length());
+
+            // Resolving concrete indices and routing and grouping the requests by shard
+            requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
+            // Keep track what slots belong to what shard, in case a request to a shard fails on all copies
+            Map<ShardId, TIntArrayList> shardToSlotsBuilder = new HashMap<ShardId, TIntArrayList>();
+            int expectedResults = 0;
+            for (int slot = 0;  slot < percolateRequests.length(); slot++) {
+                Object element = percolateRequests.get(slot);
+                assert element != null;
+                if (element instanceof PercolateRequest) {
+                    PercolateRequest percolateRequest = (PercolateRequest) element;
+                    String[] concreteIndices = clusterState.metaData().concreteIndices(percolateRequest.indices(), percolateRequest.ignoreIndices(), true);
+                    Map<String, Set<String>> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), percolateRequest.indices());
+                    // TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction
+                    GroupShardsIterator shards = clusterService.operationRouting().searchShards(
+                            clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference()
+                    );
+
+                    responsesByItemAndShard.set(slot, new AtomicReferenceArray(shards.size()));
+                    expectedOperationsPerItem.set(slot, new AtomicInteger(shards.size()));
+                    for (ShardIterator shard : shards) {
+                        ShardId shardId = shard.shardId();
+                        TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
+                        if (requests == null) {
+                            requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference()));
+                        }
+                        requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));
 
-                    TIntArrayList items = shardToSlotsBuilder.get(shardId);
-                    if (items == null) {
-                        shardToSlotsBuilder.put(shardId, items = new TIntArrayList());
+                        TIntArrayList items = shardToSlotsBuilder.get(shardId);
+                        if (items == null) {
+                            shardToSlotsBuilder.put(shardId, items = new TIntArrayList());
+                        }
+                        items.add(slot);
                     }
-                    items.add(i);
+                    expectedResults++;
+                } else if (element instanceof Throwable || element instanceof MultiGetResponse.Failure) {
+                    reducedResponses.set(slot, element);
+                    responsesByItemAndShard.set(slot, new AtomicReferenceArray(0));
+                    expectedOperationsPerItem.set(slot, new AtomicInteger(0));
                 }
-                expectedResults++;
-            } else if (element instanceof Throwable) {
-                reducedResponses.set(i, element);
-                responsesByItemAndShard.set(i, new AtomicReferenceArray(0));
-                expectedOperationsPerItem.set(i, new AtomicInteger(0));
+            }
+            expectedOperations = new AtomicInteger(expectedResults);
+            // Move slot to shard tracking from normal map to concurrent save map
+            shardToSlots = ConcurrentCollections.newConcurrentMap();
+            for (Map.Entry<ShardId, TIntArrayList> entry : shardToSlotsBuilder.entrySet()) {
+                shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray()));
             }
         }
 
-        if (expectedResults == 0) {
-            finish(reducedResponses, listener);
-            return;
-        }
+        void run() {
+            if (expectedOperations.get() == 0) {
+                finish();
+                return;
+            }
 
-        // Move slot to shard tracking from normal map to concurrent save map
-        final ConcurrentMap<ShardId, AtomicIntegerArray> shardToSlots = ConcurrentCollections.newConcurrentMap();
-        for (Map.Entry<ShardId, TIntArrayList> entry : shardToSlotsBuilder.entrySet()) {
-            shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray()));
-        }
+            for (Map.Entry<ShardId, TransportShardMultiPercolateAction.Request> entry : requestsByShard.entrySet()) {
+                final int shardId = entry.getKey().id();
+                final String index =  entry.getKey().index().name();
 
-        final AtomicInteger expectedOperations = new AtomicInteger(expectedResults);
-        for (Map.Entry<ShardId, TransportShardMultiPercolateAction.Request> entry : requestsByShard.entrySet()) {
-            final ShardId shardId = entry.getKey();
-            final TransportShardMultiPercolateAction.Request shardRequest = entry.getValue();
-            shardMultiPercolateAction.execute(shardRequest, new ActionListener<TransportShardMultiPercolateAction.Response>() {
+                TransportShardMultiPercolateAction.Request shardRequest = entry.getValue();
+                shardMultiPercolateAction.execute(shardRequest, new ActionListener<TransportShardMultiPercolateAction.Response>() {
 
-                @Override
-                @SuppressWarnings("unchecked")
-                public void onResponse(TransportShardMultiPercolateAction.Response response) {
-                    try {
-                        for (TransportShardMultiPercolateAction.Response.Item item : response.items()) {
-                            AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot());
-                            if (shardResults == null) {
-                                continue;
-                            }
+                    @Override
+                    public void onResponse(TransportShardMultiPercolateAction.Response response) {
+                        onShardResponse(new ShardId(index, shardId), response);
+                    }
 
-                            if (item.failed()) {
-                                shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, item.error().string()));
-                            } else {
-                                shardResults.set(shardId.id(), item.response());
-                            }
+                    @Override
+                    public void onFailure(Throwable e) {
+                        onShardFailure(new ShardId(index, shardId), e);
+                    }
 
-                            assert expectedOperationsPerItem.get(item.slot()).get() >= 1 : "slot[" + item.slot() + "] can't be lower than one";
-                            if (expectedOperationsPerItem.get(item.slot()).decrementAndGet() == 0) {
-                                // Failure won't bubble up, since we fail the whole request now via the catch clause below,
-                                // so expectedOperationsPerItem will not be decremented twice.
-                                reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
-                            }
-                        }
-                    } catch (Throwable e) {
-                        logger.error("{} Percolate original reduce error", e, shardId);
-                        listener.onFailure(e);
+                });
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        void onShardResponse(ShardId shardId, TransportShardMultiPercolateAction.Response response) {
+            try {
+                for (TransportShardMultiPercolateAction.Response.Item item : response.items()) {
+                    AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot());
+                    if (shardResults == null) {
+                        assert false : "shardResults can't be null";
+                        continue;
                     }
-                }
 
-                @Override
-                @SuppressWarnings("unchecked")
-                public void onFailure(Throwable e) {
-                    logger.debug("Shard multi percolate failure", e);
-                    try {
-                        AtomicIntegerArray slots = shardToSlots.get(shardId);
-                        for (int i = 0; i < slots.length(); i++) {
-                            int slot = slots.get(i);
-                            AtomicReferenceArray shardResults = responsesByItemAndShard.get(slot);
-                            if (shardResults == null) {
-                                continue;
-                            }
+                    if (item.failed()) {
+                        shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, item.error().string()));
+                    } else {
+                        shardResults.set(shardId.id(), item.response());
+                    }
 
-                            shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e));
-                            assert expectedOperationsPerItem.get(slot).get() >= 1 : "slot[" + slot + "] can't be lower than one. Caused by: " + e.getMessage();
-                            if (expectedOperationsPerItem.get(slot).decrementAndGet() == 0) {
-                                reduce(slot, percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
-                            }
-                        }
-                    } catch (Throwable t) {
-                        logger.error("{} Percolate original reduce error, original error {}", t, shardId, e);
-                        listener.onFailure(t);
+                    assert expectedOperationsPerItem.get(item.slot()).get() >= 1 : "slot[" + item.slot() + "] can't be lower than one";
+                    if (expectedOperationsPerItem.get(item.slot()).decrementAndGet() == 0) {
+                        // Failure won't bubble up, since we fail the whole request now via the catch clause below,
+                        // so expectedOperationsPerItem will not be decremented twice.
+                        reduce(item.slot());
                     }
                 }
+            } catch (Throwable e) {
+                logger.error("{} Percolate original reduce error", e, shardId);
+                listener.onFailure(e);
+            }
+        }
 
-            });
+        @SuppressWarnings("unchecked")
+        void onShardFailure(ShardId shardId, Throwable e) {
+            logger.debug("Shard multi percolate failure", e);
+            try {
+                AtomicIntegerArray slots = shardToSlots.get(shardId);
+                for (int i = 0; i < slots.length(); i++) {
+                    int slot = slots.get(i);
+                    AtomicReferenceArray shardResults = responsesByItemAndShard.get(slot);
+                    if (shardResults == null) {
+                        continue;
+                    }
+
+                    shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e));
+                    assert expectedOperationsPerItem.get(slot).get() >= 1 : "slot[" + slot + "] can't be lower than one. Caused by: " + e.getMessage();
+                    if (expectedOperationsPerItem.get(slot).decrementAndGet() == 0) {
+                        reduce(slot);
+                    }
+                }
+            } catch (Throwable t) {
+                logger.error("{} Percolate original reduce error, original error {}", t, shardId, e);
+                listener.onFailure(t);
+            }
         }
-    }
 
-    private void reduce(int slot,
-                        AtomicReferenceArray<Object> percolateRequests,
-                        AtomicInteger expectedOperations,
-                        AtomicArray<Object> reducedResponses,
-                        ActionListener<MultiPercolateResponse> listener,
-                        AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard) {
-
-        AtomicReferenceArray shardResponses = responsesByItemAndShard.get(slot);
-        PercolateResponse reducedResponse = TransportPercolateAction.reduce((PercolateRequest) percolateRequests.get(slot), shardResponses, percolatorService);
-        reducedResponses.set(slot, reducedResponse);
-        assert expectedOperations.get() >= 1 : "slot[" + slot + "] expected options should be >= 1 but is " + expectedOperations.get();
-        if (expectedOperations.decrementAndGet() == 0) {
-            finish(reducedResponses, listener);
+        void reduce(int slot) {
+            AtomicReferenceArray shardResponses = responsesByItemAndShard.get(slot);
+            PercolateResponse reducedResponse = TransportPercolateAction.reduce((PercolateRequest) percolateRequests.get(slot), shardResponses, percolatorService);
+            reducedResponses.set(slot, reducedResponse);
+            assert expectedOperations.get() >= 1 : "slot[" + slot + "] expected options should be >= 1 but is " + expectedOperations.get();
+            if (expectedOperations.decrementAndGet() == 0) {
+                finish();
+            }
         }
-    }
 
-    private void finish(AtomicArray<Object> reducedResponses, ActionListener<MultiPercolateResponse> listener) {
-        MultiPercolateResponse.Item[] finalResponse = new MultiPercolateResponse.Item[reducedResponses.length()];
-        for (int i = 0; i < reducedResponses.length(); i++) {
-            Object element = reducedResponses.get(i);
-            assert element != null : "Element[" + i + "] shouldn't be null";
-            if (element instanceof PercolateResponse) {
-                finalResponse[i] = new MultiPercolateResponse.Item((PercolateResponse) element);
-            } else if (element instanceof Throwable) {
-                finalResponse[i] = new MultiPercolateResponse.Item(ExceptionsHelper.detailedMessage((Throwable) element));
+        void finish() {
+            MultiPercolateResponse.Item[] finalResponse = new MultiPercolateResponse.Item[reducedResponses.length()];
+            for (int slot = 0; slot < reducedResponses.length(); slot++) {
+                Object element = reducedResponses.get(slot);
+                assert element != null : "Element[" + slot + "] shouldn't be null";
+                if (element instanceof PercolateResponse) {
+                    finalResponse[slot] = new MultiPercolateResponse.Item((PercolateResponse) element);
+                } else if (element instanceof Throwable) {
+                    finalResponse[slot] = new MultiPercolateResponse.Item(ExceptionsHelper.detailedMessage((Throwable) element));
+                } else if (element instanceof MultiGetResponse.Failure) {
+                    finalResponse[slot] = new MultiPercolateResponse.Item(((MultiGetResponse.Failure)element).getMessage());
+                }
             }
+            listener.onResponse(new MultiPercolateResponse(finalResponse));
         }
-        listener.onResponse(new MultiPercolateResponse(finalResponse));
+
     }
 
 

+ 35 - 25
src/main/java/org/elasticsearch/action/percolate/TransportShardMultiPercolateAction.java

@@ -99,16 +99,16 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
         Response response = new Response();
         response.items = new ArrayList<Response.Item>(request.items.size());
         for (Request.Item item : request.items) {
-            Response.Item responseItem = new Response.Item();
-            responseItem.slot = item.slot;
+            Response.Item responseItem;
+            int slot = item.slot;
             try {
-                responseItem.response = percolatorService.percolate(item.request);
+                responseItem = new Response.Item(slot, percolatorService.percolate(item.request));
             } catch (Throwable e) {
                 logger.debug("[{}][{}] failed to multi percolate", e, request.index(), request.shardId());
                 if (TransportActions.isShardNotAvailableException(e)) {
                     throw new ElasticSearchException("", e);
                 } else {
-                    responseItem.error = new StringText(ExceptionsHelper.detailedMessage(e));
+                    responseItem = new Response.Item(slot, new StringText(ExceptionsHelper.detailedMessage(e)));
                 }
             }
             response.items.add(responseItem);
@@ -123,6 +123,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
         private String preference;
         private List<Item> items;
 
+        private volatile boolean done = false;
+
         public Request() {
         }
 
@@ -153,13 +155,13 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
             int size = in.readVInt();
             items = new ArrayList<Item>(size);
             for (int i = 0; i < size; i++) {
-                Item item = new Item();
-                item.slot = in.readVInt();
-                item.request = new PercolateShardRequest(index(), shardId);
-                item.request.documentType(in.readString());
-                item.request.source(in.readBytesReference());
-                item.request.docSource(in.readBytesReference());
-                item.request.onlyCount(in.readBoolean());
+                int slot = in.readVInt();
+                PercolateShardRequest shardRequest = new PercolateShardRequest(index(), shardId);
+                shardRequest.documentType(in.readString());
+                shardRequest.source(in.readBytesReference());
+                shardRequest.docSource(in.readBytesReference());
+                shardRequest.onlyCount(in.readBoolean());
+                Item item = new Item(slot, shardRequest);
                 items.add(item);
             }
         }
@@ -181,11 +183,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
 
         public static class Item {
 
-            private int slot;
-            private PercolateShardRequest request;
-
-            Item() {
-            }
+            private final int slot;
+            private final PercolateShardRequest request;
 
             public Item(int slot, PercolateShardRequest request) {
                 this.slot = slot;
@@ -234,23 +233,34 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
             int size = in.readVInt();
             items = new ArrayList<Item>(size);
             for (int i = 0; i < size; i++) {
-                Item item = new Item();
-                item.slot = in.readVInt();
+                int slot = in.readVInt();
                 if (in.readBoolean()) {
-                    item.response = new PercolateShardResponse();
-                    item.response.readFrom(in);
+                    PercolateShardResponse shardResponse = new PercolateShardResponse();
+                    shardResponse.readFrom(in);
+                    items.add(new Item(slot, shardResponse));
                 } else {
-                    item.error = in.readText();
+                    items.add(new Item(slot, in.readText()));
                 }
-                items.add(item);
             }
         }
 
         public static class Item {
 
-            private int slot;
-            private PercolateShardResponse response;
-            private Text error;
+            private final int slot;
+            private final PercolateShardResponse response;
+            private final Text error;
+
+            public Item(Integer slot, PercolateShardResponse response) {
+                this.slot = slot;
+                this.response = response;
+                this.error = null;
+            }
+
+            public Item(Integer slot, Text error) {
+                this.slot = slot;
+                this.error = error;
+                this.response = null;
+            }
 
             public int slot() {
                 return slot;