Browse Source

Bulk API: Do not fail whole request on closed index

The bulk API request was marked as completely failed,
in case a request with a closed index was referred in
any of the requests inside of a bulk one.

Implementation Note: Currently the implementation is a bit more verbose in order to prevent an instanceof check and another cast - if that is fast enough, we could execute that logic only once at the
beginning of the loop (thinking this might be a bit overoptimization here).

Closes #6410
Brian Murphy 11 years ago
parent
commit
61c21f9a0e

+ 66 - 0
src/main/java/org/elasticsearch/action/DocumentRequest.java

@@ -0,0 +1,66 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action;
+
+import org.elasticsearch.action.support.IndicesOptions;
+
+/**
+ * Generic interface to group ActionRequest, which work on single document level
+ *
+ * Forces this class return index/type/id getters
+ */
+public interface DocumentRequest<T> {
+
+    /**
+     * Get the index that this request operates on
+     * @return the index
+     */
+    String index();
+
+    /**
+     * Get the type that this request operates on
+     * @return the type
+     */
+    String type();
+
+    /**
+     * Get the id of the document for this request
+     * @return the id
+     */
+    String id();
+
+    /**
+     * Get the options for this request
+     * @return the indices options
+     */
+    IndicesOptions indicesOptions();
+
+    /**
+     * Set the routing for this request
+     * @param routing
+     * @return the Request
+     */
+    T routing(String routing);
+
+    /**
+     * Get the routing for this request
+     * @return the Routing
+     */
+    String routing();
+}

+ 61 - 38
src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -27,6 +27,7 @@ import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocumentRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -40,6 +41,7 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
@@ -47,8 +49,10 @@ import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
+import org.elasticsearch.indices.IndexClosedException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -96,26 +100,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         if (autoCreateIndex.needToCheck()) {
             final Set<String> indices = Sets.newHashSet();
             for (ActionRequest request : bulkRequest.requests) {
-                if (request instanceof IndexRequest) {
-                    IndexRequest indexRequest = (IndexRequest) request;
-                    if (!indices.contains(indexRequest.index())) {
-                        indices.add(indexRequest.index());
-                    }
-                } else if (request instanceof DeleteRequest) {
-                    DeleteRequest deleteRequest = (DeleteRequest) request;
-                    if (!indices.contains(deleteRequest.index())) {
-                        indices.add(deleteRequest.index());
-                    }
-                } else if (request instanceof UpdateRequest) {
-                    UpdateRequest updateRequest = (UpdateRequest) request;
-                    if (!indices.contains(updateRequest.index())) {
-                        indices.add(updateRequest.index());
+                if (request instanceof DocumentRequest) {
+                    DocumentRequest req = (DocumentRequest) request;
+                    if (!indices.contains(req.index())) {
+                        indices.add(req.index());
                     }
                 } else {
                     throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
                 }
             }
-
             final AtomicInteger counter = new AtomicInteger(indices.size());
             ClusterState state = clusterService.state();
             for (final String index : indices) {
@@ -204,30 +197,33 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         MetaData metaData = clusterState.metaData();
         for (int i = 0; i < bulkRequest.requests.size(); i++) {
             ActionRequest request = bulkRequest.requests.get(i);
-            if (request instanceof IndexRequest) {
-                IndexRequest indexRequest = (IndexRequest) request;
-                String concreteIndex = concreteIndices.resolveIfAbsent(indexRequest.index(), indexRequest.indicesOptions());
-                MappingMetaData mappingMd = null;
-                if (metaData.hasIndex(concreteIndex)) {
-                    mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
+            if (request instanceof DocumentRequest) {
+                DocumentRequest req = (DocumentRequest) request;
+
+                if (addFailureIfIndexIsClosed(req, bulkRequest, responses, i, concreteIndices, metaData)) {
+                    continue;
                 }
-                try {
-                    indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
-                } catch (ElasticsearchParseException e) {
-                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
-                    BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
-                    responses.set(i, bulkItemResponse);
-                    // make sure the request gets never processed again
-                    bulkRequest.requests.set(i, null);
+
+                String concreteIndex = concreteIndices.resolveIfAbsent(req.index(), req.indicesOptions());
+                if (request instanceof IndexRequest) {
+                    IndexRequest indexRequest = (IndexRequest) request;
+                    MappingMetaData mappingMd = null;
+                    if (metaData.hasIndex(concreteIndex)) {
+                        mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
+                    }
+                    try {
+                        indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
+                    } catch (ElasticsearchParseException e) {
+                        BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
+                        BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
+                        responses.set(i, bulkItemResponse);
+                        // make sure the request gets never processed again
+                        bulkRequest.requests.set(i, null);
+                    }
+                } else {
+                    concreteIndices.resolveIfAbsent(req.index(), req.indicesOptions());
+                    req.routing(clusterState.metaData().resolveIndexRouting(req.routing(), req.index()));
                 }
-            } else if (request instanceof DeleteRequest) {
-                DeleteRequest deleteRequest = (DeleteRequest) request;
-                concreteIndices.resolveIfAbsent(deleteRequest.index(), deleteRequest.indicesOptions());
-                deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
-            } else if (request instanceof UpdateRequest) {
-                UpdateRequest updateRequest = (UpdateRequest) request;
-                concreteIndices.resolveIfAbsent(updateRequest.index(), updateRequest.indicesOptions());
-                updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index()));
             }
         }
 
@@ -343,8 +339,35 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
     }
 
-    private static class ConcreteIndices  {
+    private boolean addFailureIfIndexIsClosed(DocumentRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx,
+                                              final ConcreteIndices concreteIndices,
+                                              final MetaData metaData) {
+        String concreteIndex = concreteIndices.getConcreteIndex(request.index());
+        boolean isClosed = false;
+        if (concreteIndex == null) {
+            try {
+                concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions());
+            } catch (IndexClosedException ice) {
+                isClosed = true;
+            }
+        }
+        if (!isClosed) {
+            IndexMetaData indexMetaData = metaData.index(concreteIndex);
+            isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE;
+        }
+        if (isClosed) {
+            BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
+                    new IndexClosedException(new Index(metaData.index(request.index()).getIndex())));
+            BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure);
+            responses.set(idx, bulkItemResponse);
+            // make sure the request gets never processed again
+            bulkRequest.requests.set(idx, null);
+        }
+        return isClosed;
+    }
+
 
+    private static class ConcreteIndices  {
         private final Map<String, String> indices = new HashMap<>();
         private final MetaData metaData;
 

+ 2 - 1
src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

@@ -21,6 +21,7 @@ package org.elasticsearch.action.delete;
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.DocumentRequest;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -43,7 +44,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
  * @see org.elasticsearch.client.Client#delete(DeleteRequest)
  * @see org.elasticsearch.client.Requests#deleteRequest(String)
  */
-public class DeleteRequest extends ShardReplicationOperationRequest<DeleteRequest> {
+public class DeleteRequest extends ShardReplicationOperationRequest<DeleteRequest> implements DocumentRequest<DeleteRequest> {
 
     private String type;
     private String id;

+ 2 - 1
src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.RoutingMissingException;
 import org.elasticsearch.action.TimestampParsingException;
+import org.elasticsearch.action.DocumentRequest;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -64,7 +65,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
  * @see org.elasticsearch.client.Requests#indexRequest(String)
  * @see org.elasticsearch.client.Client#index(IndexRequest)
  */
-public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest> {
+public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
 
     /**
      * Operation type controls if the type of the index operation.

+ 2 - 1
src/main/java/org/elasticsearch/action/update/UpdateRequest.java

@@ -22,6 +22,7 @@ package org.elasticsearch.action.update;
 import com.google.common.collect.Maps;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.DocumentRequest;
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.replication.ReplicationType;
@@ -47,7 +48,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
 
 /**
  */
-public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest> {
+public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest> implements DocumentRequest<UpdateRequest> {
 
     private String type;
     private String id;

+ 24 - 0
src/test/java/org/elasticsearch/document/BulkTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.document;
 import com.google.common.base.Charsets;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.count.CountResponse;
@@ -651,8 +652,31 @@ public class BulkTests extends ElasticsearchIntegrationTest {
         assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete"));
     }
 
+
     private static String indexOrAlias() {
         return randomBoolean() ? "test" : "alias";
     }
+
+    @Test // issue 6410
+    public void testThatMissingIndexDoesNotAbortFullBulkRequest() throws Exception{
+        createIndex("bulkindex1", "bulkindex2");
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1"))
+                   .add(new IndexRequest("bulkindex2", "index2_type", "1").source("text", "hallo2"))
+                   .add(new IndexRequest("bulkindex2", "index2_type").source("text", "hallo2"))
+                   .add(new UpdateRequest("bulkindex2", "index2_type", "2").doc("foo", "bar"))
+                   .add(new DeleteRequest("bulkindex2", "index2_type", "3"))
+                   .refresh(true);
+
+        client().bulk(bulkRequest).get();
+        SearchResponse searchResponse = client().prepareSearch("bulkindex*").get();
+        assertHitCount(searchResponse, 3);
+
+        assertAcked(client().admin().indices().prepareClose("bulkindex2"));
+
+        BulkResponse bulkResponse = client().bulk(bulkRequest).get();
+        assertThat(bulkResponse.hasFailures(), is(true));
+        assertThat(bulkResponse.getItems().length, is(5));
+    }
 }