Bläddra i källkod

Refactored TransportIndexReplicationOperationAction to be able to expose the shard id related to a shard failure

The `ShardOperationFailedException` is now created within `TransportIndexReplicationAction` passing in the current shard id as a constructor argument.
Also replaced `AtomicReferenceArray<Object>` with `AtomicReferenceArray<ShardActionResult>`, where `ShardActionResult` wraps the `ShardResponse` or the failure, containing all the needed info.
Luca Cavanna 11 år sedan
förälder
incheckning
c58c9cd352

+ 2 - 2
src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java

@@ -35,9 +35,9 @@ public class IndexDeleteResponse extends ActionResponse {
     private int failedShards;
     private ShardDeleteResponse[] deleteResponses;
 
-    IndexDeleteResponse(String index, int successfulShards, int failedShards, ShardDeleteResponse[] deleteResponses) {
+    IndexDeleteResponse(String index, int failedShards, ShardDeleteResponse[] deleteResponses) {
         this.index = index;
-        this.successfulShards = successfulShards;
+        this.successfulShards = deleteResponses.length;
         this.failedShards = failedShards;
         this.deleteResponses = deleteResponses;
     }

+ 4 - 15
src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.delete.index;
 
+import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
@@ -30,8 +31,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.List;
 
 /**
  *
@@ -50,19 +50,8 @@ public class TransportIndexDeleteAction extends TransportIndexReplicationOperati
     }
 
     @Override
-    protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, AtomicReferenceArray shardsResponses) {
-        int successfulShards = 0;
-        int failedShards = 0;
-        ArrayList<ShardDeleteResponse> responses = new ArrayList<ShardDeleteResponse>();
-        for (int i = 0; i < shardsResponses.length(); i++) {
-            if (shardsResponses.get(i) == null) {
-                failedShards++;
-            } else {
-                responses.add((ShardDeleteResponse) shardsResponses.get(i));
-                successfulShards++;
-            }
-        }
-        return new IndexDeleteResponse(request.index(), successfulShards, failedShards, responses.toArray(new ShardDeleteResponse[responses.size()]));
+    protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List<ShardDeleteResponse> shardDeleteResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) {
+        return new IndexDeleteResponse(request.index(), failuresCount, shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()]));
     }
 
     @Override

+ 2 - 17
src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java

@@ -20,7 +20,6 @@
 package org.elasticsearch.action.deletebyquery;
 
 import org.elasticsearch.action.ShardOperationFailedException;
-import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
@@ -32,9 +31,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReferenceArray;
 
 /**
  *
@@ -53,20 +50,8 @@ public class TransportIndexDeleteByQueryAction extends TransportIndexReplication
     }
 
     @Override
-    protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, AtomicReferenceArray shardsResponses) {
-        int successfulShards = 0;
-        int failedShards = 0;
-        List<ShardOperationFailedException> failures = new ArrayList<ShardOperationFailedException>(3);
-        for (int i = 0; i < shardsResponses.length(); i++) {
-            Object shardResponse = shardsResponses.get(i);
-            if (shardResponse instanceof Throwable) {
-                failedShards++;
-                failures.add(new DefaultShardOperationFailedException(request.index(), -1, (Throwable) shardResponse));
-            } else {
-                successfulShards++;
-            }
-        }
-        return new IndexDeleteByQueryResponse(request.index(), successfulShards, failedShards, failures);
+    protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, List<ShardDeleteByQueryResponse> shardDeleteByQueryResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) {
+        return new IndexDeleteByQueryResponse(request.index(), shardDeleteByQueryResponses.size(), failuresCount, shardFailures);
     }
 
     @Override

+ 56 - 8
src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java

@@ -19,10 +19,13 @@
 
 package org.elasticsearch.action.support.replication;
 
+import com.google.common.collect.Lists;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ShardOperationFailedException;
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
@@ -36,6 +39,7 @@ import org.elasticsearch.transport.BaseTransportRequestHandler;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 
@@ -81,8 +85,9 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
             return;
         }
         final AtomicInteger indexCounter = new AtomicInteger();
+        final AtomicInteger failureCounter = new AtomicInteger();
         final AtomicInteger completionCounter = new AtomicInteger(groups.size());
-        final AtomicReferenceArray<Object> shardsResponses = new AtomicReferenceArray<Object>(groups.size());
+        final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<ShardActionResult>(groups.size());
 
         for (final ShardIterator shardIt : groups) {
             ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
@@ -96,20 +101,41 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
             shardAction.execute(shardRequest, new ActionListener<ShardResponse>() {
                 @Override
                 public void onResponse(ShardResponse result) {
-                    shardsResponses.set(indexCounter.getAndIncrement(), result);
-                    if (completionCounter.decrementAndGet() == 0) {
-                        listener.onResponse(newResponseInstance(request, shardsResponses));
-                    }
+                    shardsResponses.set(indexCounter.getAndIncrement(), new ShardActionResult(result));
+                    returnIfNeeded();
                 }
 
                 @Override
                 public void onFailure(Throwable e) {
+                    failureCounter.getAndIncrement();
                     int index = indexCounter.getAndIncrement();
                     if (accumulateExceptions()) {
-                        shardsResponses.set(index, e);
+                        shardsResponses.set(index, new ShardActionResult(
+                                new DefaultShardOperationFailedException(request.index, shardIt.shardId().id(), e)));
                     }
+                    returnIfNeeded();
+                }
+
+                private void returnIfNeeded() {
                     if (completionCounter.decrementAndGet() == 0) {
-                        listener.onResponse(newResponseInstance(request, shardsResponses));
+                        List<ShardResponse> responses = Lists.newArrayList();
+                        List<ShardOperationFailedException> failures = Lists.newArrayList();
+                        for (int i = 0; i < shardsResponses.length(); i++) {
+                            ShardActionResult shardActionResult = shardsResponses.get(i);
+                            if (shardActionResult == null) {
+                                assert !accumulateExceptions();
+                                continue;
+                            }
+                            if (shardActionResult.isFailure()) {
+                                assert accumulateExceptions() && shardActionResult.shardFailure != null;
+                                failures.add(shardActionResult.shardFailure);
+                            } else {
+                                responses.add(shardActionResult.shardResponse);
+                            }
+                        }
+
+                        assert failures.size() == 0 || failures.size() == failureCounter.get();
+                        listener.onResponse(newResponseInstance(request, responses, failureCounter.get(), failures));
                     }
                 }
             });
@@ -118,7 +144,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
 
     protected abstract Request newRequestInstance();
 
-    protected abstract Response newResponseInstance(Request request, AtomicReferenceArray shardsResponses);
+    protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, int failuresCount, List<ShardOperationFailedException> shardFailures);
 
     protected abstract String transportAction();
 
@@ -132,6 +158,28 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
 
     protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);
 
+    private class ShardActionResult {
+
+        private final ShardResponse shardResponse;
+        private final ShardOperationFailedException shardFailure;
+
+        private ShardActionResult(ShardResponse shardResponse) {
+            assert shardResponse != null;
+            this.shardResponse = shardResponse;
+            this.shardFailure = null;
+        }
+
+        private ShardActionResult(ShardOperationFailedException shardOperationFailedException) {
+            assert shardOperationFailedException != null;
+            this.shardFailure = shardOperationFailedException;
+            this.shardResponse = null;
+        }
+
+        boolean isFailure() {
+            return shardFailure != null;
+        }
+    }
+
     private class TransportHandler extends BaseTransportRequestHandler<Request> {
 
         @Override

+ 13 - 5
src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.deleteByQuery;
 
+import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
 import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
 import org.elasticsearch.action.search.SearchResponse;
@@ -27,12 +28,10 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.indices.IndexMissingException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.junit.Assert;
 import org.junit.Test;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.*;
 
 public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
 
@@ -83,9 +82,10 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
         deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
 
         try {
-            DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet();
-            Assert.fail("Exception should have been thrown.");
+            deleteByQueryRequestBuilder.execute().actionGet();
+            fail("Exception should have been thrown.");
         } catch (IndexMissingException e) {
+            //everything well
         }
 
         deleteByQueryRequestBuilder.setIndicesOptions(IndicesOptions.lenient());
@@ -110,6 +110,14 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
         assertThat(response.status(), equalTo(RestStatus.BAD_REQUEST));
         assertThat(response.getIndex("twitter").getSuccessfulShards(), equalTo(0));
         assertThat(response.getIndex("twitter").getFailedShards(), equalTo(5));
+        assertThat(response.getIndices().size(), equalTo(1));
+        assertThat(response.getIndices().get("twitter").getFailedShards(), equalTo(5));
+        assertThat(response.getIndices().get("twitter").getFailures().length, equalTo(5));
+        for (ShardOperationFailedException failure : response.getIndices().get("twitter").getFailures()) {
+            assertThat(failure.reason(), containsString("[twitter] [has_child] No mapping for for type [type]"));
+            assertThat(failure.status(), equalTo(RestStatus.BAD_REQUEST));
+            assertThat(failure.shardId(), greaterThan(-1));
+        }
     }
 
     @Test