Browse Source

Bulk indexing: Fix 8125 hanged request when auto create index is off.
If a bulk request contains a mix of indexing requests for an existing index and one that needs to be auto-created but a cluster configuration prevents the auto-create of the new index the ingest process hangs. The exception for the failure to create an index was not caught or reported back properly. Added a Junit test to recreate the issue and the associated fix is in TransportBulkAction.

Closes #8125

markharwood 11 years ago
parent
commit
d12ae196af

+ 27 - 11
src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -53,11 +53,16 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.indices.IndexClosedException;
+import org.elasticsearch.indices.IndexMissingException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -117,7 +122,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                         @Override
                         public void onResponse(CreateIndexResponse result) {
                             if (counter.decrementAndGet() == 0) {
-                                executeBulk(bulkRequest, startTime, listener, responses);
+                                try {
+                                    executeBulk(bulkRequest, startTime, listener, responses);
+                                } catch (Throwable t) {
+                                    listener.onFailure(t);
+                                }
                             }
                         }
 
@@ -205,7 +214,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             if (request instanceof DocumentRequest) {
                 DocumentRequest req = (DocumentRequest) request;
 
-                if (addFailureIfIndexIsClosed(req, bulkRequest, responses, i, concreteIndices, metaData)) {
+                if (addFailureIfIndexIsUnavailable(req, bulkRequest, responses, i, concreteIndices, metaData)) {
                     continue;
                 }
 
@@ -344,31 +353,38 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
     }
 
-    private boolean addFailureIfIndexIsClosed(DocumentRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx,
+    private boolean addFailureIfIndexIsUnavailable(DocumentRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx,
                                               final ConcreteIndices concreteIndices,
                                               final MetaData metaData) {
         String concreteIndex = concreteIndices.getConcreteIndex(request.index());
-        boolean isClosed = false;
+        Exception unavailableException = null;
         if (concreteIndex == null) {
             try {
                 concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions());
             } catch (IndexClosedException ice) {
-                isClosed = true;
+                unavailableException = ice;
+            } catch (IndexMissingException ime) {
+                // Fix for issue where bulk request references an index that
+                // cannot be auto-created see issue #8125
+                unavailableException = ime;
             }
         }
-        if (!isClosed) {
+        if (unavailableException == null) {
             IndexMetaData indexMetaData = metaData.index(concreteIndex);
-            isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE;
+            if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
+                unavailableException = new IndexClosedException(new Index(metaData.index(request.index()).getIndex()));
+            }
         }
-        if (isClosed) {
+        if (unavailableException != null) {
             BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
-                    new IndexClosedException(new Index(metaData.index(request.index()).getIndex())));
+                    unavailableException);
             BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure);
             responses.set(idx, bulkItemResponse);
             // make sure the request gets never processed again
             bulkRequest.requests.set(idx, null);
+            return true;
         }
-        return isClosed;
+        return false;
     }
 
 

+ 54 - 0
src/test/java/org/elasticsearch/action/bulk/BulkProcessorClusterSettingsTests.java

@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.bulk;
+
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
+import org.junit.Test;
+
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+public class BulkProcessorClusterSettingsTests extends ElasticsearchIntegrationTest {
+
+    @Test
+    public void testBulkProcessorAutoCreateRestrictions() throws Exception {
+        // See issue #8125
+        Settings settings = ImmutableSettings.settingsBuilder().put("action.auto_create_index", false).build();
+
+        internalCluster().startNode(settings);
+
+        createIndex("willwork");
+        client().admin().cluster().prepareHealth("willwork").setWaitForGreenStatus().execute().actionGet();
+
+        BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
+        bulkRequestBuilder.add(client().prepareIndex("willwork", "type1", "1").setSource("{\"foo\":1}"));
+        bulkRequestBuilder.add(client().prepareIndex("wontwork", "type1", "2").setSource("{\"foo\":2}"));
+        bulkRequestBuilder.add(client().prepareIndex("willwork", "type1", "3").setSource("{\"foo\":3}"));
+        BulkResponse br = bulkRequestBuilder.get();
+        BulkItemResponse[] responses = br.getItems();
+        assertEquals(3, responses.length);
+        assertFalse("Operation on existing index should succeed", responses[0].isFailed());
+        assertTrue("Missing index should have been flagged", responses[1].isFailed());
+        assertEquals("IndexMissingException[[wontwork] missing]", responses[1].getFailureMessage());
+        assertFalse("Operation on existing index should succeed", responses[2].isFailed());
+    }
+}