浏览代码

Merge pull request #13380 from brwe/exception-on-force-merge

Engine: Let AlreadyClosedException and EngineClosedExceptionBubble up
Britta Weber 10 年之前
父节点
当前提交
3de8cbb54c

+ 0 - 1
core/src/main/java/org/elasticsearch/ElasticsearchException.java

@@ -583,7 +583,6 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
                 org.elasticsearch.index.query.QueryParsingException.class,
                 org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class,
                 org.elasticsearch.index.engine.DeleteByQueryFailedEngineException.class,
-                org.elasticsearch.index.engine.ForceMergeFailedEngineException.class,
                 org.elasticsearch.discovery.MasterNotDiscoveredException.class,
                 org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException.class,
                 org.elasticsearch.node.NodeClosedException.class,

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java

@@ -74,7 +74,7 @@ public class TransportOptimizeAction extends TransportBroadcastByNodeAction<Opti
     }
 
     @Override
-    protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) {
+    protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) throws IOException {
         IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
         indexShard.optimize(request);
         return EmptyResult.INSTANCE;

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java

@@ -119,7 +119,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
     }
 
     @Override
-    protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) {
+    protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) throws IOException {
         IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
         org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request);
         // We are using the current version of Elasticsearch as upgrade version since we update mapping to match the current version

+ 3 - 1
core/src/main/java/org/elasticsearch/action/support/TransportActions.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.support;
 
+import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.NoShardAvailableActionException;
 import org.elasticsearch.action.UnavailableShardsException;
@@ -36,7 +37,8 @@ public class TransportActions {
                 actual instanceof IndexNotFoundException ||
                 actual instanceof IllegalIndexShardStateException ||
                 actual instanceof NoShardAvailableActionException ||
-                actual instanceof UnavailableShardsException) {
+                actual instanceof UnavailableShardsException ||
+                actual instanceof AlreadyClosedException) {
             return true;
         }
         return false;

+ 1 - 1
core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

@@ -181,7 +181,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
      * @param shardRouting the shard on which to execute the operation
      * @return the result of the shard-level operation for the shard
      */
-    protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting);
+    protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting) throws IOException;
 
     /**
      * Determines the shards on which this operation will be executed on. The operation is executed once per shard.

+ 2 - 2
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -502,14 +502,14 @@ public abstract class Engine implements Closeable {
     /**
      * Optimizes to 1 segment
      */
-    public void forceMerge(boolean flush) {
+    public void forceMerge(boolean flush) throws IOException {
         forceMerge(flush, 1, false, false, false);
     }
 
     /**
      * Triggers a forced merge on this engine
      */
-    public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;
+    public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
 
     /**
      * Snapshots the index and returns a handle to it. If needed will try and "commit" the

+ 0 - 39
core/src/main/java/org/elasticsearch/index/engine/ForceMergeFailedEngineException.java

@@ -1,39 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.engine;
-
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class ForceMergeFailedEngineException extends EngineException {
-
-    public ForceMergeFailedEngineException(ShardId shardId, Throwable t) {
-        super(shardId, "force merge failed", t);
-    }
-
-    public ForceMergeFailedEngineException(StreamInput in) throws IOException{
-        super(in);
-    }
-}

+ 4 - 4
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -45,6 +45,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.cluster.routing.DjbHashFunction;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.lease.Releasable;
@@ -823,7 +824,7 @@ public class InternalEngine extends Engine {
 
     @Override
     public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
-                           final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException {
+                           final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, EngineClosedException, IOException {
         /*
          * We do NOT acquire the readlock here since we are waiting on the merges to finish
          * that's fine since the IW.rollback should stop all the threads and trigger an IOException
@@ -865,9 +866,8 @@ public class InternalEngine extends Engine {
                 store.decRef();
             }
         } catch (Throwable t) {
-            ForceMergeFailedEngineException ex = new ForceMergeFailedEngineException(shardId, t);
-            maybeFailEngine("force merge", ex);
-            throw ex;
+            maybeFailEngine("force merge", t);
+            throw t;
         } finally {
             try {
                 mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error

+ 2 - 2
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -697,7 +697,7 @@ public class IndexShard extends AbstractIndexShardComponent {
 
     }
 
-    public void optimize(OptimizeRequest optimize) {
+    public void optimize(OptimizeRequest optimize) throws IOException {
         verifyStarted();
         if (logger.isTraceEnabled()) {
             logger.trace("optimize with {}", optimize);
@@ -708,7 +708,7 @@ public class IndexShard extends AbstractIndexShardComponent {
     /**
      * Upgrades the shard to the current version of Lucene and returns the minimum segment version
      */
-    public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) {
+    public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOException {
         verifyStarted();
         if (logger.isTraceEnabled()) {
             logger.trace("upgrade with {}", upgrade);

+ 41 - 2
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Base64;
@@ -94,6 +95,7 @@ import java.nio.file.Path;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
@@ -1000,8 +1002,7 @@ public class InternalEngineTests extends ESTestCase {
                                 indexed.countDown();
                                 try {
                                     engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean());
-                                } catch (ForceMergeFailedEngineException ex) {
-                                    // ok
+                                } catch (IOException e) {
                                     return;
                                 }
                             }
@@ -2019,4 +2020,42 @@ public class InternalEngineTests extends ESTestCase {
             assertThat(topDocs.totalHits, equalTo(numDocs));
         }
     }
+
+    public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
+        String operation = randomFrom("optimize", "refresh", "flush");
+        Thread mergeThread = new Thread() {
+            @Override
+            public void run() {
+                boolean stop = false;
+                logger.info("try with {}", operation);
+                while (stop == false) {
+                    try {
+                        switch (operation) {
+                            case "optimize": {
+                                engine.forceMerge(true, 1, false, false, false);
+                                break;
+                            }
+                            case "refresh": {
+                                engine.refresh("test refresh");
+                                break;
+                            }
+                            case "flush": {
+                                engine.flush(true, false);
+                                break;
+                            }
+                        }
+                    } catch (Throwable t) {
+                        throwable.set(t);
+                        stop = true;
+                    }
+                }
+            }
+        };
+        mergeThread.start();
+        engine.close();
+        mergeThread.join();
+        logger.info("exception caught: ", throwable.get());
+        assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get()));
+    }
 }