Browse Source

add elasticsearch-shard tool (#32281)

Relates #31389
Vladimir Dolzhenko 7 years ago
parent
commit
a3e8b831ee
29 changed files with 2326 additions and 720 deletions
  1. 5 0
      distribution/src/bin/elasticsearch-shard
  2. 12 0
      distribution/src/bin/elasticsearch-shard.bat
  3. 2 0
      docs/reference/commands/index.asciidoc
  4. 107 0
      docs/reference/commands/shard-tool.asciidoc
  5. 4 0
      docs/reference/index-modules/translog.asciidoc
  6. 6 1
      libs/cli/src/main/java/org/elasticsearch/cli/Terminal.java
  7. 17 0
      qa/vagrant/src/main/java/org/elasticsearch/packaging/test/ArchiveTestCase.java
  8. 1 0
      qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Archives.java
  9. 2 1
      qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Installation.java
  10. 1 0
      qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Packages.java
  11. 1 0
      qa/vagrant/src/test/resources/packaging/utils/packages.bash
  12. 1 0
      qa/vagrant/src/test/resources/packaging/utils/tar.bash
  13. 115 51
      server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
  14. 100 0
      server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedLuceneSegmentsAction.java
  15. 545 0
      server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java
  16. 20 5
      server/src/main/java/org/elasticsearch/index/shard/ShardPath.java
  17. 39 0
      server/src/main/java/org/elasticsearch/index/shard/ShardToolCli.java
  18. 2 1
      server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java
  19. 4 1
      server/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java
  20. 245 0
      server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java
  21. 0 254
      server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java
  22. 2 18
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  23. 652 0
      server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java
  24. 409 0
      server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java
  25. 1 1
      server/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java
  26. 9 2
      server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java
  27. 0 382
      server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java
  28. 6 3
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  29. 18 0
      test/framework/src/main/java/org/elasticsearch/test/CorruptionUtils.java

+ 5 - 0
distribution/src/bin/elasticsearch-shard

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+ES_MAIN_CLASS=org.elasticsearch.index.shard.ShardToolCli \
+  "`dirname "$0"`"/elasticsearch-cli \
+  "$@"

+ 12 - 0
distribution/src/bin/elasticsearch-shard.bat

@@ -0,0 +1,12 @@
+@echo off
+
+setlocal enabledelayedexpansion
+setlocal enableextensions
+
+set ES_MAIN_CLASS=org.elasticsearch.index.shard.ShardToolCli
+call "%~dp0elasticsearch-cli.bat" ^
+  %%* ^
+  || exit /b 1
+
+endlocal
+endlocal

+ 2 - 0
docs/reference/commands/index.asciidoc

@@ -12,6 +12,7 @@ tasks from the command line:
 * <<migrate-tool>>
 * <<saml-metadata>>
 * <<setup-passwords>>
+* <<shard-tool>>
 * <<syskeygen>>
 * <<users-command>>
 
@@ -22,5 +23,6 @@ include::certutil.asciidoc[]
 include::migrate-tool.asciidoc[]
 include::saml-metadata.asciidoc[]
 include::setup-passwords.asciidoc[]
+include::shard-tool.asciidoc[]
 include::syskeygen.asciidoc[]
 include::users-command.asciidoc[]

+ 107 - 0
docs/reference/commands/shard-tool.asciidoc

@@ -0,0 +1,107 @@
+[[shard-tool]]
+== elasticsearch-shard
+
+In some cases the Lucene index or translog of a shard copy can become
+corrupted. The `elasticsearch-shard` command enables you to remove corrupted
+parts of the shard if a good copy of the shard cannot be recovered
+automatically or restored from backup.
+
+[WARNING]
+You will lose the corrupted data when you run `elasticsearch-shard`.  This tool
+should only be used as a last resort if there is no way to recover from another
+copy of the shard or restore a snapshot.
+
+When Elasticsearch detects that a shard's data is corrupted, it fails that
+shard copy and refuses to use it. Under normal conditions, the shard is
+automatically recovered from another copy. If no good copy of the shard is
+available and you cannot restore from backup, you can use `elasticsearch-shard`
+to remove the corrupted data and restore access to any remaining data in
+unaffected segments.
+
+[WARNING]
+Stop Elasticsearch before running `elasticsearch-shard`.
+
+To remove corrupted shard data use the `remove-corrupted-data` subcommand.
+
+There are two ways to specify the path:
+
+* Specify the index name and shard name with the `--index` and `--shard-id`
+  options.
+* Use the `--dir` option to specify the full path to the corrupted index or
+  translog files.
+
+[float]
+=== Removing corrupted data
+
+`elasticsearch-shard` analyses the shard copy and provides an overview of the
+corruption found. To proceed you must then confirm that you want to remove the
+corrupted data.
+
+[WARNING]
+Back up your data before running `elasticsearch-shard`. This is a destructive
+operation that removes corrupted data from the shard.
+
+[source,txt]
+--------------------------------------------------
+$ bin/elasticsearch-shard remove-corrupted-data --index twitter --shard-id 0
+
+
+    WARNING: Elasticsearch MUST be stopped before running this tool.
+
+  Please make a complete backup of your index before using this tool.
+
+
+Opening Lucene index at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/index/
+
+ >> Lucene index is corrupted at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/index/
+
+Opening translog at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/
+
+
+ >> Translog is clean at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/
+
+
+  Corrupted Lucene index segments found - 32 documents will be lost.
+
+            WARNING:              YOU WILL LOSE DATA.
+
+Continue and remove docs from the index ? Y
+
+WARNING: 1 broken segments (containing 32 documents) detected
+Took 0.056 sec total.
+Writing...
+OK
+Wrote new segments file "segments_c"
+Marking index with the new history uuid : 0pIBd9VTSOeMfzYT6p0AsA
+Changing allocation id V8QXk-QXSZinZMT-NvEq4w to tjm9Ve6uTBewVFAlfUMWjA
+
+You should run the following command to allocate this shard:
+
+POST /_cluster/reroute
+{
+  "commands" : [
+    {
+      "allocate_stale_primary" : {
+        "index" : "index42",
+        "shard" : 0,
+        "node" : "II47uXW2QvqzHBnMcl2o_Q",
+        "accept_data_loss" : false
+      }
+    }
+  ]
+}
+
+You must accept the possibility of data loss by changing parameter `accept_data_loss` to `true`.
+
+Deleted corrupt marker corrupted_FzTSBSuxT7i3Tls_TgwEag from /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/index/
+
+--------------------------------------------------
+
+When you use `elasticsearch-shard` to drop the corrupted data, the shard's
+allocation ID changes. After restarting the node, you must use the
+<<cluster-reroute,cluster reroute API>> to tell Elasticsearch to use the new
+ID. The `elasticsearch-shard` command shows the request that
+you need to submit.
+
+You can also use the `-h` option to get a list of all options and parameters
+that the `elasticsearch-shard` tool supports.

+ 4 - 0
docs/reference/index-modules/translog.asciidoc

@@ -92,6 +92,10 @@ The maximum duration for which translog files will be kept. Defaults to `12h`.
 [[corrupt-translog-truncation]]
 === What to do if the translog becomes corrupted?
 
+[WARNING]
+This tool is deprecated and will be completely removed in 7.0.
+Use the <<shard-tool,elasticsearch-shard tool>> instead of this one.
+
 In some cases (a bad drive, user error) the translog on a shard copy can become
 corrupted. When this corruption is detected by Elasticsearch due to mismatching
 checksums, Elasticsearch will fail that shard copy and refuse to use that copy

+ 6 - 1
libs/cli/src/main/java/org/elasticsearch/cli/Terminal.java

@@ -85,12 +85,17 @@ public abstract class Terminal {
 
     /** Prints message to the terminal at {@code verbosity} level, without a newline. */
     public final void print(Verbosity verbosity, String msg) {
-        if (this.verbosity.ordinal() >= verbosity.ordinal()) {
+        if (isPrintable(verbosity)) {
             getWriter().print(msg);
             getWriter().flush();
         }
     }
 
+    /** Checks if is enough {@code verbosity} level to be printed */
+    public final boolean isPrintable(Verbosity verbosity) {
+        return this.verbosity.ordinal() >= verbosity.ordinal();
+    }
+
     /**
      * Prompt for a yes or no answer from the user. This method will loop until 'y' or 'n'
      * (or the default empty value) is entered.

+ 17 - 0
qa/vagrant/src/main/java/org/elasticsearch/packaging/test/ArchiveTestCase.java

@@ -325,4 +325,21 @@ public abstract class ArchiveTestCase extends PackagingTestCase {
         }
     }
 
+    public void test100RepairIndexCliPackaging() {
+        assumeThat(installation, is(notNullValue()));
+
+        final Installation.Executables bin = installation.executables();
+        final Shell sh = new Shell();
+
+        Platforms.PlatformAction action = () -> {
+            final Result result = sh.run(bin.elasticsearchShard + " help");
+            assertThat(result.stdout, containsString("A CLI tool to remove corrupted parts of unrecoverable shards"));
+        };
+
+        if (distribution().equals(Distribution.DEFAULT_TAR) || distribution().equals(Distribution.DEFAULT_ZIP)) {
+            Platforms.onLinux(action);
+            Platforms.onWindows(action);
+        }
+    }
+
 }

+ 1 - 0
qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Archives.java

@@ -186,6 +186,7 @@ public class Archives {
             "elasticsearch-env",
             "elasticsearch-keystore",
             "elasticsearch-plugin",
+            "elasticsearch-shard",
             "elasticsearch-translog"
         ).forEach(executable -> {
 

+ 2 - 1
qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Installation.java

@@ -100,8 +100,9 @@ public class Installation {
         public final Path elasticsearch = platformExecutable("elasticsearch");
         public final Path elasticsearchPlugin = platformExecutable("elasticsearch-plugin");
         public final Path elasticsearchKeystore = platformExecutable("elasticsearch-keystore");
-        public final Path elasticsearchTranslog = platformExecutable("elasticsearch-translog");
         public final Path elasticsearchCertutil = platformExecutable("elasticsearch-certutil");
+        public final Path elasticsearchShard = platformExecutable("elasticsearch-shard");
+        public final Path elasticsearchTranslog = platformExecutable("elasticsearch-translog");
 
         private Path platformExecutable(String name) {
             final String platformExecutableName = Platforms.WINDOWS

+ 1 - 0
qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Packages.java

@@ -187,6 +187,7 @@ public class Packages {
             "elasticsearch",
             "elasticsearch-plugin",
             "elasticsearch-keystore",
+            "elasticsearch-shard",
             "elasticsearch-translog"
         ).forEach(executable -> assertThat(es.bin(executable), file(File, "root", "root", p755)));
 

+ 1 - 0
qa/vagrant/src/test/resources/packaging/utils/packages.bash

@@ -95,6 +95,7 @@ verify_package_installation() {
     assert_file "$ESHOME/bin" d root root 755
     assert_file "$ESHOME/bin/elasticsearch" f root root 755
     assert_file "$ESHOME/bin/elasticsearch-plugin" f root root 755
+    assert_file "$ESHOME/bin/elasticsearch-shard" f root root 755
     assert_file "$ESHOME/bin/elasticsearch-translog" f root root 755
     assert_file "$ESHOME/lib" d root root 755
     assert_file "$ESCONFIG" d root elasticsearch 2750

+ 1 - 0
qa/vagrant/src/test/resources/packaging/utils/tar.bash

@@ -94,6 +94,7 @@ verify_archive_installation() {
     assert_file "$ESHOME/bin/elasticsearch-env" f elasticsearch elasticsearch 755
     assert_file "$ESHOME/bin/elasticsearch-keystore" f elasticsearch elasticsearch 755
     assert_file "$ESHOME/bin/elasticsearch-plugin" f elasticsearch elasticsearch 755
+    assert_file "$ESHOME/bin/elasticsearch-shard" f elasticsearch elasticsearch 755
     assert_file "$ESHOME/bin/elasticsearch-translog" f elasticsearch elasticsearch 755
     assert_file "$ESCONFIG" d elasticsearch elasticsearch 755
     assert_file "$ESCONFIG/elasticsearch.yml" f elasticsearch elasticsearch 660

+ 115 - 51
server/src/main/java/org/elasticsearch/env/NodeEnvironment.java

@@ -30,6 +30,8 @@ import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.store.NativeFSLockFactory;
 import org.apache.lucene.store.SimpleFSDirectory;
+import org.elasticsearch.common.CheckedFunction;
+import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -75,6 +77,7 @@ import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static java.util.Collections.unmodifiableSet;
@@ -171,6 +174,63 @@ public final class NodeEnvironment  implements Closeable {
     public static final String INDICES_FOLDER = "indices";
     public static final String NODE_LOCK_FILENAME = "node.lock";
 
+    public static class NodeLock implements Releasable {
+
+        private final int nodeId;
+        private final Lock[] locks;
+        private final NodePath[] nodePaths;
+
+        /**
+         * Tries to acquire a node lock for a node id, throws {@code IOException} if it is unable to acquire it
+         * @param pathFunction function to check node path before attempt of acquiring a node lock
+         */
+        public NodeLock(final int nodeId, final Logger logger,
+                        final Environment environment,
+                        final CheckedFunction<Path, Boolean, IOException> pathFunction) throws IOException {
+            this.nodeId = nodeId;
+            nodePaths = new NodePath[environment.dataWithClusterFiles().length];
+            locks = new Lock[nodePaths.length];
+            try {
+                final Path[] dataPaths = environment.dataFiles();
+                for (int dirIndex = 0; dirIndex < dataPaths.length; dirIndex++) {
+                    Path dataDir = dataPaths[dirIndex];
+                    Path dir = resolveNodePath(dataDir, nodeId);
+                    if (pathFunction.apply(dir) == false) {
+                        continue;
+                    }
+                    try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
+                        logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
+                        locks[dirIndex] = luceneDir.obtainLock(NODE_LOCK_FILENAME);
+                        nodePaths[dirIndex] = new NodePath(dir);
+                    } catch (IOException e) {
+                        logger.trace(() -> new ParameterizedMessage(
+                            "failed to obtain node lock on {}", dir.toAbsolutePath()), e);
+                        // release all the ones that were obtained up until now
+                        throw (e instanceof LockObtainFailedException ? e
+                            : new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e));
+                    }
+                }
+            } catch (IOException e) {
+                close();
+                throw e;
+            }
+        }
+
+        public NodePath[] getNodePaths() {
+            return nodePaths;
+        }
+
+        @Override
+        public void close() {
+            for (int i = 0; i < locks.length; i++) {
+                if (locks[i] != null) {
+                    IOUtils.closeWhileHandlingException(locks[i]);
+                }
+                locks[i] = null;
+            }
+        }
+    }
+
     /**
      * Setup the environment.
      * @param settings settings from elasticsearch.yml
@@ -188,51 +248,39 @@ public final class NodeEnvironment  implements Closeable {
             nodeIdConsumer.accept(nodeMetaData.nodeId());
             return;
         }
-        final NodePath[] nodePaths = new NodePath[environment.dataWithClusterFiles().length];
-        final Lock[] locks = new Lock[nodePaths.length];
         boolean success = false;
+        NodeLock nodeLock = null;
 
         try {
             sharedDataPath = environment.sharedDataFile();
-            int nodeLockId = -1;
             IOException lastException = null;
             int maxLocalStorageNodes = MAX_LOCAL_STORAGE_NODES_SETTING.get(settings);
-            for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
-                for (int dirIndex = 0; dirIndex < environment.dataFiles().length; dirIndex++) {
-                    Path dataDir = environment.dataFiles()[dirIndex];
-                    Path dir = resolveNodePath(dataDir, possibleLockId);
-                    Files.createDirectories(dir);
 
-                    try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
-                        logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
-                        try {
-                            locks[dirIndex] = luceneDir.obtainLock(NODE_LOCK_FILENAME);
-                            nodePaths[dirIndex] = new NodePath(dir);
-                            nodeLockId = possibleLockId;
-                        } catch (LockObtainFailedException ex) {
-                            logger.trace(
-                                    new ParameterizedMessage("failed to obtain node lock on {}", dir.toAbsolutePath()), ex);
-                            // release all the ones that were obtained up until now
-                            releaseAndNullLocks(locks);
-                            break;
-                        }
-
-                    } catch (IOException e) {
-                        logger.trace(() -> new ParameterizedMessage(
-                            "failed to obtain node lock on {}", dir.toAbsolutePath()), e);
-                        lastException = new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e);
-                        // release all the ones that were obtained up until now
-                        releaseAndNullLocks(locks);
-                        break;
-                    }
-                }
-                if (locks[0] != null) {
-                    // we found a lock, break
+            final AtomicReference<IOException> onCreateDirectoriesException = new AtomicReference<>();
+            for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
+                try {
+                    nodeLock = new NodeLock(possibleLockId, logger, environment,
+                        dir -> {
+                            try {
+                                Files.createDirectories(dir);
+                            } catch (IOException e) {
+                                onCreateDirectoriesException.set(e);
+                                throw e;
+                            }
+                            return true;
+                        });
                     break;
+                } catch (LockObtainFailedException e) {
+                    // ignore any LockObtainFailedException
+                } catch (IOException e) {
+                    if (onCreateDirectoriesException.get() != null) {
+                        throw onCreateDirectoriesException.get();
+                    }
+                    lastException = e;
                 }
             }
 
-            if (locks[0] == null) {
+            if (nodeLock == null) {
                 final String message = String.format(
                     Locale.ROOT,
                     "failed to obtain node locks, tried [%s] with lock id%s;" +
@@ -243,13 +291,12 @@ public final class NodeEnvironment  implements Closeable {
                     maxLocalStorageNodes);
                 throw new IllegalStateException(message, lastException);
             }
+            this.locks = nodeLock.locks;
+            this.nodePaths = nodeLock.nodePaths;
+            this.nodeLockId = nodeLock.nodeId;
             this.nodeMetaData = loadOrCreateNodeMetaData(settings, logger, nodePaths);
             nodeIdConsumer.accept(nodeMetaData.nodeId());
 
-            this.nodeLockId = nodeLockId;
-            this.locks = locks;
-            this.nodePaths = nodePaths;
-
             if (logger.isDebugEnabled()) {
                 logger.debug("using node location [{}], local_lock_id [{}]", nodePaths, nodeLockId);
             }
@@ -262,7 +309,7 @@ public final class NodeEnvironment  implements Closeable {
             success = true;
         } finally {
             if (success == false) {
-                IOUtils.closeWhileHandlingException(locks);
+                close();
             }
         }
     }
@@ -278,15 +325,6 @@ public final class NodeEnvironment  implements Closeable {
         return path.resolve(NODES_FOLDER).resolve(Integer.toString(nodeLockId));
     }
 
-    private static void releaseAndNullLocks(Lock[] locks) {
-        for (int i = 0; i < locks.length; i++) {
-            if (locks[i] != null) {
-                IOUtils.closeWhileHandlingException(locks[i]);
-            }
-            locks[i] = null;
-        }
-    }
-
     private void maybeLogPathDetails() throws IOException {
 
         // We do some I/O in here, so skip this if DEBUG/INFO are not enabled:
@@ -696,6 +734,13 @@ public final class NodeEnvironment  implements Closeable {
         return paths;
     }
 
+    /**
+     * Returns shared data path for this node environment
+     */
+    public Path sharedDataPath() {
+        return sharedDataPath;
+    }
+
     /**
      * returns the unique uuid describing this node. The uuid is persistent in the data folder of this node
      * and remains across restarts.
@@ -956,11 +1001,22 @@ public final class NodeEnvironment  implements Closeable {
      * @param indexSettings settings for the index
      */
     public Path resolveBaseCustomLocation(IndexSettings indexSettings) {
+        return resolveBaseCustomLocation(indexSettings, sharedDataPath, nodeLockId);
+    }
+
+    /**
+     * Resolve the custom path for a index's shard.
+     * Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
+     * the root path for the index.
+     *
+     * @param indexSettings settings for the index
+     */
+    public static Path resolveBaseCustomLocation(IndexSettings indexSettings, Path sharedDataPath, int nodeLockId) {
         String customDataDir = indexSettings.customDataPath();
         if (customDataDir != null) {
             // This assert is because this should be caught by MetaDataCreateIndexService
             assert sharedDataPath != null;
-            return sharedDataPath.resolve(customDataDir).resolve(Integer.toString(this.nodeLockId));
+            return sharedDataPath.resolve(customDataDir).resolve(Integer.toString(nodeLockId));
         } else {
             throw new IllegalArgumentException("no custom " + IndexMetaData.SETTING_DATA_PATH + " setting available");
         }
@@ -974,7 +1030,11 @@ public final class NodeEnvironment  implements Closeable {
      * @param indexSettings settings for the index
      */
     private Path resolveIndexCustomLocation(IndexSettings indexSettings) {
-        return resolveBaseCustomLocation(indexSettings).resolve(indexSettings.getUUID());
+        return resolveIndexCustomLocation(indexSettings, sharedDataPath, nodeLockId);
+    }
+
+    private static Path resolveIndexCustomLocation(IndexSettings indexSettings, Path sharedDataPath, int nodeLockId) {
+        return resolveBaseCustomLocation(indexSettings, sharedDataPath, nodeLockId).resolve(indexSettings.getUUID());
     }
 
     /**
@@ -986,7 +1046,11 @@ public final class NodeEnvironment  implements Closeable {
      * @param shardId shard to resolve the path to
      */
     public Path resolveCustomLocation(IndexSettings indexSettings, final ShardId shardId) {
-        return resolveIndexCustomLocation(indexSettings).resolve(Integer.toString(shardId.id()));
+        return resolveCustomLocation(indexSettings, shardId, sharedDataPath, nodeLockId);
+    }
+
+    public static Path resolveCustomLocation(IndexSettings indexSettings, final ShardId shardId, Path sharedDataPath, int nodeLockId) {
+        return resolveIndexCustomLocation(indexSettings, sharedDataPath, nodeLockId).resolve(Integer.toString(shardId.id()));
     }
 
     /**

+ 100 - 0
server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedLuceneSegmentsAction.java

@@ -0,0 +1,100 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.apache.lucene.index.CheckIndex;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.Lock;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cli.Terminal;
+import org.elasticsearch.common.collect.Tuple;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Removes corrupted Lucene index segments
+ */
+public class RemoveCorruptedLuceneSegmentsAction {
+
+    public Tuple<RemoveCorruptedShardDataCommand.CleanStatus, String> getCleanStatus(ShardPath shardPath,
+                                                                                     Directory indexDirectory,
+                                                                                     Lock writeLock,
+                                                                                     PrintStream printStream,
+                                                                                     boolean verbose) throws IOException {
+        if (RemoveCorruptedShardDataCommand.isCorruptMarkerFileIsPresent(indexDirectory) == false) {
+            return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN, null);
+        }
+
+        final CheckIndex.Status status;
+        try (CheckIndex checker = new CheckIndex(indexDirectory, writeLock)) {
+            checker.setChecksumsOnly(true);
+            checker.setInfoStream(printStream, verbose);
+
+            status = checker.checkIndex(null);
+
+            if (status.missingSegments) {
+                return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.UNRECOVERABLE,
+                    "Index is unrecoverable - there are missing segments");
+            }
+
+            return status.clean
+                ? Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN_WITH_CORRUPTED_MARKER, null)
+                : Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CORRUPTED,
+                    "Corrupted Lucene index segments found - " + status.totLoseDocCount + " documents will be lost.");
+        }
+    }
+
+    public void execute(Terminal terminal,
+                        ShardPath shardPath,
+                        Directory indexDirectory,
+                        Lock writeLock,
+                        PrintStream printStream,
+                        boolean verbose) throws IOException {
+        checkCorruptMarkerFileIsPresent(indexDirectory);
+
+        final CheckIndex.Status status;
+        try (CheckIndex checker = new CheckIndex(indexDirectory, writeLock)) {
+
+            checker.setChecksumsOnly(true);
+            checker.setInfoStream(printStream, verbose);
+
+            status = checker.checkIndex(null);
+
+            if (status.missingSegments == false) {
+                if (status.clean == false) {
+                    terminal.println("Writing...");
+                    checker.exorciseIndex(status);
+
+                    terminal.println("OK");
+                    terminal.println("Wrote new segments file \"" + status.segmentsFileName + "\"");
+                }
+            } else {
+                throw new ElasticsearchException("Index is unrecoverable - there are missing segments");
+            }
+        }
+    }
+
+    protected void checkCorruptMarkerFileIsPresent(Directory directory) throws IOException {
+        if (RemoveCorruptedShardDataCommand.isCorruptMarkerFileIsPresent(directory) == false) {
+            throw new ElasticsearchException("There is no corruption file marker");
+        }
+    }
+
+}

+ 545 - 0
server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java

@@ -0,0 +1,545 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.NativeFSLockFactory;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cli.EnvironmentAwareCommand;
+import org.elasticsearch.cli.Terminal;
+import org.elasticsearch.cluster.ClusterModule;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.AllocationId;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.env.NodeMetaData;
+import org.elasticsearch.gateway.MetaDataStateFormat;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.TruncateTranslogAction;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class RemoveCorruptedShardDataCommand extends EnvironmentAwareCommand {
+
+    private static final Logger logger = Loggers.getLogger(RemoveCorruptedShardDataCommand.class);
+
+    private final OptionSpec<String> folderOption;
+    private final OptionSpec<String> indexNameOption;
+    private final OptionSpec<Integer> shardIdOption;
+
+    private final RemoveCorruptedLuceneSegmentsAction removeCorruptedLuceneSegmentsAction;
+    private final TruncateTranslogAction truncateTranslogAction;
+    private final NamedXContentRegistry namedXContentRegistry;
+
+    public RemoveCorruptedShardDataCommand() {
+        this(false);
+    }
+
+    public RemoveCorruptedShardDataCommand(boolean translogOnly) {
+        super("Removes corrupted shard files");
+
+        folderOption = parser.acceptsAll(Arrays.asList("d", "dir"),
+            "Index directory location on disk")
+            .withRequiredArg();
+
+        indexNameOption = parser.accepts("index", "Index name")
+            .withRequiredArg();
+
+        shardIdOption = parser.accepts("shard-id", "Shard id")
+            .withRequiredArg()
+            .ofType(Integer.class);
+
+        namedXContentRegistry = new NamedXContentRegistry(ClusterModule.getNamedXWriteables());
+
+        removeCorruptedLuceneSegmentsAction = translogOnly ? null : new RemoveCorruptedLuceneSegmentsAction();
+        truncateTranslogAction = new TruncateTranslogAction(namedXContentRegistry);
+    }
+
+    @Override
+    protected void printAdditionalHelp(Terminal terminal) {
+        if (removeCorruptedLuceneSegmentsAction == null) {
+            // that's only for 6.x branch for bwc with elasticsearch-translog
+            terminal.println("This tool truncates the translog and translog checkpoint files to create a new translog");
+        } else {
+            terminal.println("This tool attempts to detect and remove unrecoverable corrupted data in a shard.");
+        }
+    }
+
+    // Visible for testing
+    public OptionParser getParser() {
+        return this.parser;
+    }
+
+    @SuppressForbidden(reason = "Necessary to use the path passed in")
+    protected Path getPath(String dirValue) {
+        return PathUtils.get(dirValue, "", "");
+    }
+
+    protected void findAndProcessShardPath(OptionSet options, Environment environment, CheckedConsumer<ShardPath, IOException> consumer)
+    throws IOException {
+        final Settings settings = environment.settings();
+
+        final String indexName;
+        final int shardId;
+        final int fromNodeId;
+        final int toNodeId;
+
+        if (options.has(folderOption)) {
+            final Path path = getPath(folderOption.value(options)).getParent();
+            final Path shardParent = path.getParent();
+            final Path shardParentParent = shardParent.getParent();
+            final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
+            if (Files.exists(indexPath) == false || Files.isDirectory(indexPath) == false) {
+                throw new ElasticsearchException("index directory [" + indexPath + "], must exist and be a directory");
+            }
+
+            final IndexMetaData indexMetaData =
+                IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardParent);
+
+            final String shardIdFileName = path.getFileName().toString();
+            final String nodeIdFileName = shardParentParent.getParent().getFileName().toString();
+            if (Files.isDirectory(path) && shardIdFileName.chars().allMatch(Character::isDigit) // SHARD-ID path element check
+                && NodeEnvironment.INDICES_FOLDER.equals(shardParentParent.getFileName().toString()) // `indices` check
+                && nodeIdFileName.chars().allMatch(Character::isDigit) // NODE-ID check
+                && NodeEnvironment.NODES_FOLDER.equals(shardParentParent.getParent().getParent().getFileName().toString()) // `nodes` check
+            ) {
+                shardId = Integer.parseInt(shardIdFileName);
+                indexName = indexMetaData.getIndex().getName();
+                fromNodeId = Integer.parseInt(nodeIdFileName);
+                toNodeId = fromNodeId + 1;
+            } else {
+                throw new ElasticsearchException("Unable to resolve shard id. Wrong folder structure at [ " + path.toString()
+                    + " ], expected .../nodes/[NODE-ID]/indices/[INDEX-UUID]/[SHARD-ID]");
+            }
+        } else {
+            // otherwise resolve shardPath based on the index name and shard id
+            indexName = Objects.requireNonNull(indexNameOption.value(options), "Index name is required");
+            shardId = Objects.requireNonNull(shardIdOption.value(options), "Shard ID is required");
+
+            // resolve shard path in case of multi-node layout per environment
+            fromNodeId = 0;
+            toNodeId = NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.get(settings);
+        }
+
+        // have to iterate over possibleLockId as NodeEnvironment; on a contrast to it - we have to fail if node is busy
+        for (int possibleLockId = fromNodeId; possibleLockId < toNodeId; possibleLockId++) {
+            try {
+                try (NodeEnvironment.NodeLock nodeLock = new NodeEnvironment.NodeLock(possibleLockId, logger, environment, Files::exists)) {
+                    final NodeEnvironment.NodePath[] nodePaths = nodeLock.getNodePaths();
+                    for (NodeEnvironment.NodePath nodePath : nodePaths) {
+                        if (Files.exists(nodePath.indicesPath)) {
+                            // have to scan all index uuid folders to resolve from index name
+                            try (DirectoryStream<Path> stream = Files.newDirectoryStream(nodePath.indicesPath)) {
+                                for (Path file : stream) {
+                                    if (Files.exists(file.resolve(MetaDataStateFormat.STATE_DIR_NAME)) == false) {
+                                        continue;
+                                    }
+
+                                    final IndexMetaData indexMetaData =
+                                        IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, file);
+                                    if (indexMetaData == null) {
+                                        continue;
+                                    }
+                                    final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
+                                    final Index index = indexMetaData.getIndex();
+                                    if (indexName.equals(index.getName()) == false) {
+                                        continue;
+                                    }
+                                    final ShardId shId = new ShardId(index, shardId);
+
+                                    final Path shardPathLocation = nodePath.resolve(shId);
+                                    if (Files.exists(shardPathLocation) == false) {
+                                        continue;
+                                    }
+                                    final ShardPath shardPath = ShardPath.loadShardPath(logger, shId, indexSettings,
+                                        new Path[]{shardPathLocation}, possibleLockId, nodePath.path);
+                                    if (shardPath != null) {
+                                        consumer.accept(shardPath);
+                                        return;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (LockObtainFailedException lofe) {
+                throw new ElasticsearchException("Failed to lock node's directory [" + lofe.getMessage()
+                    + "], is Elasticsearch still running ?");
+            }
+        }
+        throw new ElasticsearchException("Unable to resolve shard path for index [" + indexName + "] and shard id [" + shardId + "]");
+    }
+
+    public static boolean isCorruptMarkerFileIsPresent(final Directory directory) throws IOException {
+        boolean found = false;
+
+        final String[] files = directory.listAll();
+        for (String file : files) {
+            if (file.startsWith(Store.CORRUPTED)) {
+                found = true;
+                break;
+            }
+        }
+
+        return found;
+    }
+
+    protected void dropCorruptMarkerFiles(Terminal terminal, Path path, Directory directory, boolean clean) throws IOException {
+        if (clean) {
+            confirm("This shard has been marked as corrupted but no corruption can now be detected.\n"
+                + "This may indicate an intermittent hardware problem. The corruption marker can be \n"
+                + "removed, but there is a risk that data has been undetectably lost.\n\n"
+                + "Are you taking a risk of losing documents and proceed with removing a corrupted marker ?",
+                terminal);
+        }
+        String[] files = directory.listAll();
+        boolean found = false;
+        for (String file : files) {
+            if (file.startsWith(Store.CORRUPTED)) {
+                directory.deleteFile(file);
+
+                terminal.println("Deleted corrupt marker " + file + " from " + path);
+            }
+        }
+    }
+
+    private static void loseDataDetailsBanner(Terminal terminal, Tuple<CleanStatus, String> cleanStatus) {
+        if (cleanStatus.v2() != null) {
+            terminal.println("");
+            terminal.println("  " + cleanStatus.v2());
+            terminal.println("");
+        }
+    }
+
+    private static void confirm(String msg, Terminal terminal) {
+        terminal.println(msg);
+        String text = terminal.readText("Confirm [y/N] ");
+        if (text.equalsIgnoreCase("y") == false) {
+            throw new ElasticsearchException("aborted by user");
+        }
+    }
+
+    private void warnAboutESShouldBeStopped(Terminal terminal) {
+        terminal.println("-----------------------------------------------------------------------");
+        terminal.println("");
+        terminal.println("    WARNING: Elasticsearch MUST be stopped before running this tool.");
+        terminal.println("");
+        // that's only for 6.x branch for bwc with elasticsearch-translog
+        if (removeCorruptedLuceneSegmentsAction == null) {
+            terminal.println("  This tool is deprecated and will be completely removed in 7.0.");
+            terminal.println("  It is replaced by the elasticsearch-shard tool. ");
+            terminal.println("");
+        }
+        terminal.println("  Please make a complete backup of your index before using this tool.");
+        terminal.println("");
+        terminal.println("-----------------------------------------------------------------------");
+    }
+
+    // Visible for testing
+    @Override
+    public void execute(Terminal terminal, OptionSet options, Environment environment) throws Exception {
+        warnAboutESShouldBeStopped(terminal);
+
+        findAndProcessShardPath(options, environment, shardPath -> {
+            final Path indexPath = shardPath.resolveIndex();
+            final Path translogPath = shardPath.resolveTranslog();
+            final Path nodePath = getNodePath(shardPath);
+            if (Files.exists(translogPath) == false || Files.isDirectory(translogPath) == false) {
+                throw new ElasticsearchException("translog directory [" + translogPath + "], must exist and be a directory");
+            }
+
+            final PrintWriter writer = terminal.getWriter();
+            final PrintStream printStream = new PrintStream(new OutputStream() {
+                @Override
+                public void write(int b) {
+                    writer.write(b);
+                }
+            }, false, "UTF-8");
+            final boolean verbose = terminal.isPrintable(Terminal.Verbosity.VERBOSE);
+
+            final Directory indexDirectory = getDirectory(indexPath);
+
+            final Tuple<CleanStatus, String> indexCleanStatus;
+            final Tuple<CleanStatus, String> translogCleanStatus;
+            try (Directory indexDir = indexDirectory) {
+                // keep the index lock to block any runs of older versions of this tool
+                try (Lock writeIndexLock = indexDir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
+                    ////////// Index
+                    // that's only for 6.x branch for bwc with elasticsearch-translog
+                    if (removeCorruptedLuceneSegmentsAction != null) {
+                        terminal.println("");
+                        terminal.println("Opening Lucene index at " + indexPath);
+                        terminal.println("");
+                        try {
+                            indexCleanStatus = removeCorruptedLuceneSegmentsAction.getCleanStatus(shardPath, indexDir,
+                                writeIndexLock, printStream, verbose);
+                        } catch (Exception e) {
+                            terminal.println(e.getMessage());
+                            throw e;
+                        }
+
+                        terminal.println("");
+                        terminal.println(" >> Lucene index is " + indexCleanStatus.v1().getMessage() + " at " + indexPath);
+                        terminal.println("");
+                    } else {
+                        indexCleanStatus = Tuple.tuple(CleanStatus.CLEAN, null);
+                    }
+
+                    ////////// Translog
+                    // as translog relies on data stored in an index commit - we have to have non unrecoverable index to truncate translog
+                    if (indexCleanStatus.v1() != CleanStatus.UNRECOVERABLE) {
+                        terminal.println("");
+                        terminal.println("Opening translog at " + translogPath);
+                        terminal.println("");
+                        try {
+                            translogCleanStatus = truncateTranslogAction.getCleanStatus(shardPath, indexDir);
+                        } catch (Exception e) {
+                            terminal.println(e.getMessage());
+                            throw e;
+                        }
+
+                        terminal.println("");
+                        terminal.println(" >> Translog is " + translogCleanStatus.v1().getMessage() + " at " + translogPath);
+                        terminal.println("");
+                    } else {
+                        translogCleanStatus = Tuple.tuple(CleanStatus.UNRECOVERABLE, null);
+                    }
+
+                    ////////// Drop corrupted data
+                    final CleanStatus indexStatus = indexCleanStatus.v1();
+                    final CleanStatus translogStatus = translogCleanStatus.v1();
+
+                    if (indexStatus == CleanStatus.CLEAN && translogStatus == CleanStatus.CLEAN) {
+                        throw new ElasticsearchException("Shard does not seem to be corrupted at " + shardPath.getDataPath());
+                    }
+
+                    if (indexStatus == CleanStatus.UNRECOVERABLE) {
+                        if (indexCleanStatus.v2() != null) {
+                            terminal.println("Details: " + indexCleanStatus.v2());
+                        }
+
+                        terminal.println("You can allocate a new, empty, primary shard with the following command:");
+
+                        printRerouteCommand(shardPath, terminal, false);
+
+                        throw new ElasticsearchException("Index is unrecoverable");
+                    }
+
+
+                    terminal.println("-----------------------------------------------------------------------");
+                    if (indexStatus != CleanStatus.CLEAN) {
+                        loseDataDetailsBanner(terminal, indexCleanStatus);
+                    }
+                    if (translogStatus != CleanStatus.CLEAN) {
+                        loseDataDetailsBanner(terminal, translogCleanStatus);
+                    }
+                    terminal.println("            WARNING:              YOU MAY LOSE DATA.");
+                    terminal.println("-----------------------------------------------------------------------");
+
+
+                    confirm("Continue and remove corrupted data from the shard ?", terminal);
+
+                    if (indexStatus != CleanStatus.CLEAN) {
+                        removeCorruptedLuceneSegmentsAction.execute(terminal, shardPath, indexDir,
+                            writeIndexLock, printStream, verbose);
+                    }
+
+                    if (translogStatus != CleanStatus.CLEAN) {
+                        truncateTranslogAction.execute(terminal, shardPath, indexDir);
+                    }
+                } catch (LockObtainFailedException lofe) {
+                    final String msg = "Failed to lock shard's directory at [" + indexPath + "], is Elasticsearch still running?";
+                    terminal.println(msg);
+                    throw new ElasticsearchException(msg);
+                }
+
+                final CleanStatus indexStatus = indexCleanStatus.v1();
+                final CleanStatus translogStatus = translogCleanStatus.v1();
+
+                // newHistoryCommit obtains its own lock
+                addNewHistoryCommit(indexDir, terminal, translogStatus != CleanStatus.CLEAN);
+                newAllocationId(environment, shardPath, terminal);
+                if (indexStatus != CleanStatus.CLEAN) {
+                    dropCorruptMarkerFiles(terminal, indexPath, indexDir, indexStatus == CleanStatus.CLEAN_WITH_CORRUPTED_MARKER);
+                }
+            }
+        });
+    }
+
+    private Directory getDirectory(Path indexPath) {
+        Directory directory;
+        try {
+            directory = FSDirectory.open(indexPath, NativeFSLockFactory.INSTANCE);
+        } catch (Throwable t) {
+            throw new ElasticsearchException("ERROR: could not open directory \"" + indexPath + "\"; exiting");
+        }
+        return directory;
+    }
+
+    protected void addNewHistoryCommit(Directory indexDirectory, Terminal terminal, boolean updateLocalCheckpoint) throws IOException {
+        final String historyUUID = UUIDs.randomBase64UUID();
+
+        terminal.println("Marking index with the new history uuid : " + historyUUID);
+        // commit the new history id
+        final IndexWriterConfig iwc = new IndexWriterConfig(null)
+            // we don't want merges to happen here - we call maybe merge on the engine
+            // later once we stared it up otherwise we would need to wait for it here
+            // we also don't specify a codec here and merges should use the engines for this index
+            .setCommitOnClose(false)
+            .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
+            .setMergePolicy(NoMergePolicy.INSTANCE)
+            .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
+        // IndexWriter acquires directory lock by its own
+        try (IndexWriter indexWriter = new IndexWriter(indexDirectory, iwc)) {
+            final Map<String, String> userData = new HashMap<>();
+            indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
+
+            if (updateLocalCheckpoint) {
+                // In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit.
+                // We can only safely do it because we will generate a new history uuid this shard.
+                final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
+                // Also advances the local checkpoint of the last commit to its max_seqno.
+                userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(commitInfo.maxSeqNo));
+            }
+
+            // commit the new history id
+            userData.put(Engine.HISTORY_UUID_KEY, historyUUID);
+
+            indexWriter.setLiveCommitData(userData.entrySet());
+            indexWriter.commit();
+        }
+    }
+
+    protected void newAllocationId(Environment environment, ShardPath shardPath, Terminal terminal) throws IOException {
+        final Path shardStatePath = shardPath.getShardStatePath();
+        final ShardStateMetaData shardStateMetaData =
+            ShardStateMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardStatePath);
+
+        if (shardStateMetaData == null) {
+            throw new ElasticsearchException("No shard state meta data at " + shardStatePath);
+        }
+
+        final AllocationId newAllocationId = AllocationId.newInitializing();
+
+        terminal.println("Changing allocation id " + shardStateMetaData.allocationId.getId()
+            + " to " + newAllocationId.getId());
+
+        final ShardStateMetaData newShardStateMetaData =
+            new ShardStateMetaData(shardStateMetaData.primary, shardStateMetaData.indexUUID, newAllocationId);
+
+        ShardStateMetaData.FORMAT.write(newShardStateMetaData, shardStatePath);
+
+        terminal.println("");
+        terminal.println("You should run the following command to allocate this shard:");
+
+        printRerouteCommand(shardPath, terminal, true);
+    }
+
+    private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean allocateStale) throws IOException {
+        final IndexMetaData indexMetaData =
+            IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry,
+                shardPath.getDataPath().getParent());
+
+        final Path nodePath = getNodePath(shardPath);
+        final NodeMetaData nodeMetaData =
+            NodeMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodePath);
+
+        if (nodeMetaData == null) {
+            throw new ElasticsearchException("No node meta data at " + nodePath);
+        }
+
+        final String nodeId = nodeMetaData.nodeId();
+        final String index = indexMetaData.getIndex().getName();
+        final int id = shardPath.getShardId().id();
+        final AllocationCommands commands = new AllocationCommands(
+            allocateStale
+                ? new AllocateStalePrimaryAllocationCommand(index, id, nodeId, false)
+                : new AllocateEmptyPrimaryAllocationCommand(index, id, nodeId, false));
+
+        terminal.println("");
+        terminal.println("POST /_cluster/reroute'\n"
+            + Strings.toString(commands, true, true) + "'");
+        terminal.println("");
+        terminal.println("You must accept the possibility of data loss by changing parameter `accept_data_loss` to `true`.");
+        terminal.println("");
+    }
+
+    private Path getNodePath(ShardPath shardPath) {
+        final Path nodePath = shardPath.getDataPath().getParent().getParent().getParent();
+        if (Files.exists(nodePath) == false || Files.exists(nodePath.resolve(MetaDataStateFormat.STATE_DIR_NAME)) == false) {
+            throw new ElasticsearchException("Unable to resolve node path for " + shardPath);
+        }
+        return nodePath;
+    }
+
+    public enum CleanStatus {
+        CLEAN("clean"),
+        CLEAN_WITH_CORRUPTED_MARKER("marked corrupted, but no corruption detected"),
+        CORRUPTED("corrupted"),
+        UNRECOVERABLE("corrupted and unrecoverable");
+
+        private final String msg;
+
+        CleanStatus(String msg) {
+            this.msg = msg;
+        }
+
+        public String getMessage() {
+            return msg;
+        }
+    }
+
+}

+ 20 - 5
server/src/main/java/org/elasticsearch/index/shard/ShardPath.java

@@ -112,16 +112,31 @@ public final class ShardPath {
      * <b>Note:</b> this method resolves custom data locations for the shard.
      */
     public static ShardPath loadShardPath(Logger logger, NodeEnvironment env, ShardId shardId, IndexSettings indexSettings) throws IOException {
-        final String indexUUID = indexSettings.getUUID();
         final Path[] paths = env.availableShardPaths(shardId);
+        final int nodeLockId = env.getNodeLockId();
+        final Path sharedDataPath = env.sharedDataPath();
+        return loadShardPath(logger, shardId, indexSettings, paths, nodeLockId, sharedDataPath);
+    }
+
+    /**
+     * This method walks through the nodes shard paths to find the data and state path for the given shard. If multiple
+     * directories with a valid shard state exist the one with the highest version will be used.
+     * <b>Note:</b> this method resolves custom data locations for the shard.
+     */
+    public static ShardPath loadShardPath(Logger logger, ShardId shardId, IndexSettings indexSettings, Path[] availableShardPaths,
+                                           int nodeLockId, Path sharedDataPath) throws IOException {
+        final String indexUUID = indexSettings.getUUID();
         Path loadedPath = null;
-        for (Path path : paths) {
+        for (Path path : availableShardPaths) {
             // EMPTY is safe here because we never call namedObject
             ShardStateMetaData load = ShardStateMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
             if (load != null) {
                 if (load.indexUUID.equals(indexUUID) == false && IndexMetaData.INDEX_UUID_NA_VALUE.equals(load.indexUUID) == false) {
-                    logger.warn("{} found shard on path: [{}] with a different index UUID - this shard seems to be leftover from a different index with the same name. Remove the leftover shard in order to reuse the path with the current index", shardId, path);
-                    throw new IllegalStateException(shardId + " index UUID in shard state was: " + load.indexUUID + " expected: " + indexUUID + " on shard path: " + path);
+                    logger.warn("{} found shard on path: [{}] with a different index UUID - this "
+                        + "shard seems to be leftover from a different index with the same name. "
+                        + "Remove the leftover shard in order to reuse the path with the current index", shardId, path);
+                    throw new IllegalStateException(shardId + " index UUID in shard state was: " + load.indexUUID
+                        + " expected: " + indexUUID + " on shard path: " + path);
                 }
                 if (loadedPath == null) {
                     loadedPath = path;
@@ -137,7 +152,7 @@ public final class ShardPath {
             final Path dataPath;
             final Path statePath = loadedPath;
             if (indexSettings.hasCustomDataPath()) {
-                dataPath = env.resolveCustomLocation(indexSettings, shardId);
+                dataPath = NodeEnvironment.resolveCustomLocation(indexSettings, shardId, sharedDataPath, nodeLockId);
             } else {
                 dataPath = statePath;
             }

+ 39 - 0
server/src/main/java/org/elasticsearch/index/shard/ShardToolCli.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.cli.LoggingAwareMultiCommand;
+import org.elasticsearch.cli.Terminal;
+
+/**
+ * Class encapsulating and dispatching commands from the {@code elasticsearch-shard} command line tool
+ */
+public class ShardToolCli extends LoggingAwareMultiCommand {
+
+    private ShardToolCli() {
+        super("A CLI tool to remove corrupted parts of unrecoverable shards");
+        subcommands.put("remove-corrupted-data", new RemoveCorruptedShardDataCommand());
+    }
+
+    public static void main(String[] args) throws Exception {
+        exit(new ShardToolCli().main(args, Terminal.DEFAULT));
+    }
+
+}
+

+ 2 - 1
server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java

@@ -144,7 +144,6 @@ final class TranslogHeader {
         final long primaryTerm;
         if (version == VERSION_PRIMARY_TERM) {
             primaryTerm = in.readLong();
-            assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";
         } else {
             assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
             primaryTerm = UNKNOWN_PRIMARY_TERM;
@@ -153,6 +152,8 @@ final class TranslogHeader {
         if (version >= VERSION_PRIMARY_TERM) {
             Translog.verifyChecksum(in);
         }
+        assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";
+
         final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
         assert channel.position() == headerSizeInBytes :
             "Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";

+ 4 - 1
server/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java

@@ -21,15 +21,18 @@ package org.elasticsearch.index.translog;
 
 import org.elasticsearch.cli.LoggingAwareMultiCommand;
 import org.elasticsearch.cli.Terminal;
+import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand;
 
 /**
  * Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool
  */
+@Deprecated
 public class TranslogToolCli extends LoggingAwareMultiCommand {
 
     private TranslogToolCli() {
+        // that's only for 6.x branch for bwc with elasticsearch-translog
         super("A CLI tool for various Elasticsearch translog actions");
-        subcommands.put("truncate", new TruncateTranslogCommand());
+        subcommands.put("truncate", new RemoveCorruptedShardDataCommand(true));
     }
 
     public static void main(String[] args) throws Exception {

+ 245 - 0
server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java

@@ -0,0 +1,245 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cli.Terminal;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand;
+import org.elasticsearch.index.shard.ShardPath;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class TruncateTranslogAction {
+
+    protected static final Logger logger = Loggers.getLogger(TruncateTranslogAction.class);
+    private final NamedXContentRegistry namedXContentRegistry;
+
+    public TruncateTranslogAction(NamedXContentRegistry namedXContentRegistry) {
+        this.namedXContentRegistry = namedXContentRegistry;
+    }
+
+    public Tuple<RemoveCorruptedShardDataCommand.CleanStatus, String> getCleanStatus(ShardPath shardPath,
+                                                                                     Directory indexDirectory) throws IOException {
+        final Path indexPath = shardPath.resolveIndex();
+        final Path translogPath = shardPath.resolveTranslog();
+        final List<IndexCommit> commits;
+        try {
+            commits = DirectoryReader.listCommits(indexDirectory);
+        } catch (IndexNotFoundException infe) {
+            throw new ElasticsearchException("unable to find a valid shard at [" + indexPath + "]", infe);
+        }
+
+        // Retrieve the generation and UUID from the existing data
+        final Map<String, String> commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData());
+        final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
+
+        if (translogUUID == null) {
+            throw new ElasticsearchException("shard must have a valid translog UUID but got: [null]");
+        }
+
+        final boolean clean = isTranslogClean(shardPath, translogUUID);
+
+        if (clean) {
+            return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN, null);
+        }
+
+        // Hold the lock open for the duration of the tool running
+        Set<Path> translogFiles;
+        try {
+            translogFiles = filesInDirectory(translogPath);
+        } catch (IOException e) {
+            throw new ElasticsearchException("failed to find existing translog files", e);
+        }
+        final String details = deletingFilesDetails(translogPath, translogFiles);
+
+        return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CORRUPTED, details);
+    }
+
+    public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirectory) throws IOException {
+        final Path indexPath = shardPath.resolveIndex();
+        final Path translogPath = shardPath.resolveTranslog();
+
+        final String historyUUID = UUIDs.randomBase64UUID();
+        final Map<String, String> commitData;
+        // Hold the lock open for the duration of the tool running
+        Set<Path> translogFiles;
+        try {
+            terminal.println("Checking existing translog files");
+            translogFiles = filesInDirectory(translogPath);
+        } catch (IOException e) {
+            terminal.println("encountered IOException while listing directory, aborting...");
+            throw new ElasticsearchException("failed to find existing translog files", e);
+        }
+
+        List<IndexCommit> commits;
+        try {
+            terminal.println("Reading translog UUID information from Lucene commit from shard at [" + indexPath + "]");
+            commits = DirectoryReader.listCommits(indexDirectory);
+        } catch (IndexNotFoundException infe) {
+            throw new ElasticsearchException("unable to find a valid shard at [" + indexPath + "]", infe);
+        }
+
+        // Retrieve the generation and UUID from the existing data
+        commitData = commits.get(commits.size() - 1).getUserData();
+        final String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
+        final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
+        if (translogGeneration == null || translogUUID == null) {
+            throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
+                translogGeneration, translogUUID);
+        }
+
+        final long globalCheckpoint = commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)
+            ? Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO))
+            : SequenceNumbers.UNASSIGNED_SEQ_NO;
+
+        terminal.println("Translog Generation: " + translogGeneration);
+        terminal.println("Translog UUID      : " + translogUUID);
+        terminal.println("History UUID       : " + historyUUID);
+
+        Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
+        Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
+        Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
+            translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
+        Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
+            translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
+
+        // Write empty checkpoint and translog to empty files
+        long gen = Long.parseLong(translogGeneration);
+        int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
+        writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);
+
+        terminal.println("Removing existing translog files");
+        IOUtils.rm(translogFiles.toArray(new Path[]{}));
+
+        terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]");
+        Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE);
+        terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]");
+        Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE);
+
+        // Fsync the translog directory after rename
+        IOUtils.fsync(translogPath, true);
+    }
+
+    private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException {
+        // perform clean check of translog instead of corrupted marker file
+        boolean clean = true;
+        try {
+            final Path translogPath = shardPath.resolveTranslog();
+            final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
+            final IndexMetaData indexMetaData =
+                IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardPath.getDataPath().getParent());
+            final IndexSettings indexSettings = new IndexSettings(indexMetaData, Settings.EMPTY);
+            final TranslogConfig translogConfig = new TranslogConfig(shardPath.getShardId(), translogPath,
+                indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
+            long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardPath.getShardId().id());
+            final TranslogDeletionPolicy translogDeletionPolicy =
+                new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
+                    indexSettings.getTranslogRetentionAge().getMillis());
+            try (Translog translog = new Translog(translogConfig, translogUUID,
+                translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm);
+                 Translog.Snapshot snapshot = translog.newSnapshot()) {
+                while (snapshot.next() != null) {
+                    // just iterate over snapshot
+                }
+            }
+        } catch (TranslogCorruptedException e) {
+            clean = false;
+        }
+        return clean;
+    }
+
+    /** Write a checkpoint file to the given location with the given generation */
+    static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
+        Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
+            globalCheckpoint, translogGeneration);
+        Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
+            StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+        // fsync with metadata here to make sure.
+        IOUtils.fsync(filename, false);
+    }
+
+    /**
+     * Write a translog containing the given translog UUID to the given location. Returns the number of bytes written.
+     */
+    private static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException {
+        try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) {
+            TranslogHeader header = new TranslogHeader(translogUUID, TranslogHeader.UNKNOWN_PRIMARY_TERM);
+            header.write(fc);
+            return header.sizeInBytes();
+        }
+    }
+
+    /** Show a warning about deleting files, asking for a confirmation if {@code batchMode} is false */
+    private String deletingFilesDetails(Path translogPath, Set<Path> files) {
+        StringBuilder builder = new StringBuilder();
+
+        builder
+            .append("Documents inside of translog files will be lost.\n")
+            .append("  The following files will be DELETED at ")
+            .append(translogPath)
+            .append("\n\n");
+        for(Iterator<Path> it = files.iterator();it.hasNext();) {
+            builder.append("  --> ").append(it.next().getFileName());
+            if (it.hasNext()) {
+                builder.append("\n");
+            }
+        }
+        return builder.toString();
+    }
+
+    /** Return a Set of all files in a given directory */
+    public static Set<Path> filesInDirectory(Path directory) throws IOException {
+        Set<Path> files = new TreeSet<>();
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
+            for (Path file : stream) {
+                files.add(file);
+            }
+        }
+        return files;
+    }
+
+}

+ 0 - 254
server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java

@@ -1,254 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.translog;
-
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.NoMergePolicy;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockObtainFailedException;
-import org.apache.lucene.store.NativeFSLockFactory;
-import org.elasticsearch.common.lucene.Lucene;
-import org.elasticsearch.core.internal.io.IOUtils;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.cli.EnvironmentAwareCommand;
-import org.elasticsearch.cli.Terminal;
-import org.elasticsearch.common.SuppressForbidden;
-import org.elasticsearch.common.UUIDs;
-import org.elasticsearch.common.io.PathUtils;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.seqno.SequenceNumbers;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TruncateTranslogCommand extends EnvironmentAwareCommand {
-
-    private final OptionSpec<String> translogFolder;
-    private final OptionSpec<Void> batchMode;
-
-    public TruncateTranslogCommand() {
-        super("Truncates a translog to create a new, empty translog");
-        this.translogFolder = parser.acceptsAll(Arrays.asList("d", "dir"),
-                "Translog Directory location on disk")
-                .withRequiredArg()
-                .required();
-        this.batchMode = parser.acceptsAll(Arrays.asList("b", "batch"),
-                "Enable batch mode explicitly, automatic confirmation of warnings");
-    }
-
-    // Visible for testing
-    public OptionParser getParser() {
-        return this.parser;
-    }
-
-    @Override
-    protected void printAdditionalHelp(Terminal terminal) {
-        terminal.println("This tool truncates the translog and translog");
-        terminal.println("checkpoint files to create a new translog");
-    }
-
-    @SuppressForbidden(reason = "Necessary to use the path passed in")
-    private Path getTranslogPath(OptionSet options) {
-        return PathUtils.get(translogFolder.value(options), "", "");
-    }
-
-    @Override
-    protected void execute(Terminal terminal, OptionSet options, Environment env) throws Exception {
-        boolean batch = options.has(batchMode);
-
-        Path translogPath = getTranslogPath(options);
-        Path idxLocation = translogPath.getParent().resolve("index");
-
-        if (Files.exists(translogPath) == false || Files.isDirectory(translogPath) == false) {
-            throw new ElasticsearchException("translog directory [" + translogPath + "], must exist and be a directory");
-        }
-
-        if (Files.exists(idxLocation) == false || Files.isDirectory(idxLocation) == false) {
-            throw new ElasticsearchException("unable to find a shard at [" + idxLocation + "], which must exist and be a directory");
-        }
-        try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE)) {
-            final String historyUUID = UUIDs.randomBase64UUID();
-            final Map<String, String> commitData;
-            // Hold the lock open for the duration of the tool running
-            try (Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
-                Set<Path> translogFiles;
-                try {
-                    terminal.println("Checking existing translog files");
-                    translogFiles = filesInDirectory(translogPath);
-                } catch (IOException e) {
-                    terminal.println("encountered IOException while listing directory, aborting...");
-                    throw new ElasticsearchException("failed to find existing translog files", e);
-                }
-
-                // Warn about ES being stopped and files being deleted
-                warnAboutDeletingFiles(terminal, translogFiles, batch);
-
-                List<IndexCommit> commits;
-                try {
-                    terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]");
-                    commits = DirectoryReader.listCommits(dir);
-                } catch (IndexNotFoundException infe) {
-                    throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe);
-                }
-
-                // Retrieve the generation and UUID from the existing data
-                commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData());
-                String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
-                String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
-                final long globalCheckpoint;
-                // In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit.
-                // We can only safely do it because we will generate a new history uuid this shard.
-                if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
-                    globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO));
-                    // Also advances the local checkpoint of the last commit to its max_seqno.
-                    commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint));
-                } else {
-                    globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
-                }
-                if (translogGeneration == null || translogUUID == null) {
-                    throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
-                        translogGeneration, translogUUID);
-                }
-                terminal.println("Translog Generation: " + translogGeneration);
-                terminal.println("Translog UUID      : " + translogUUID);
-                terminal.println("History UUID      : " + historyUUID);
-
-                Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
-                Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
-                Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
-                    translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
-                Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
-                    translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
-
-                // Write empty checkpoint and translog to empty files
-                long gen = Long.parseLong(translogGeneration);
-                int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
-                writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);
-
-                terminal.println("Removing existing translog files");
-                IOUtils.rm(translogFiles.toArray(new Path[]{}));
-
-                terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]");
-                Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE);
-                terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]");
-                Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE);
-
-                // Fsync the translog directory after rename
-                IOUtils.fsync(translogPath, true);
-            }
-
-            terminal.println("Marking index with the new history uuid");
-            // commit the new histroy id
-            IndexWriterConfig iwc = new IndexWriterConfig(null)
-                .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
-                .setCommitOnClose(false)
-                // we don't want merges to happen here - we call maybe merge on the engine
-                // later once we stared it up otherwise we would need to wait for it here
-                // we also don't specify a codec here and merges should use the engines for this index
-                .setMergePolicy(NoMergePolicy.INSTANCE)
-                .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
-            try (IndexWriter writer = new IndexWriter(dir, iwc)) {
-                Map<String, String> newCommitData = new HashMap<>(commitData);
-                newCommitData.put(Engine.HISTORY_UUID_KEY, historyUUID);
-                writer.setLiveCommitData(newCommitData.entrySet());
-                writer.commit();
-            }
-        } catch (LockObtainFailedException lofe) {
-            throw new ElasticsearchException("Failed to lock shard's directory at [" + idxLocation + "], is Elasticsearch still running?");
-        }
-
-        terminal.println("Done.");
-    }
-
-    /** Write a checkpoint file to the given location with the given generation */
-    static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
-        Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
-            globalCheckpoint, translogGeneration);
-        Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
-            StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
-        // fsync with metadata here to make sure.
-        IOUtils.fsync(filename, false);
-    }
-
-    /**
-     * Write a translog containing the given translog UUID to the given location. Returns the number of bytes written.
-     */
-    public static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException {
-        try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) {
-            TranslogHeader header = new TranslogHeader(translogUUID, TranslogHeader.UNKNOWN_PRIMARY_TERM);
-            header.write(fc);
-            return header.sizeInBytes();
-        }
-    }
-
-    /** Show a warning about deleting files, asking for a confirmation if {@code batchMode} is false */
-    public static void warnAboutDeletingFiles(Terminal terminal, Set<Path> files, boolean batchMode) {
-        terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
-        terminal.println("!   WARNING: Elasticsearch MUST be stopped before running this tool   !");
-        terminal.println("!                                                                     !");
-        terminal.println("!   WARNING:    Documents inside of translog files will be lost       !");
-        terminal.println("!                                                                     !");
-        terminal.println("!   WARNING:          The following files will be DELETED!            !");
-        terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
-        for (Path file : files) {
-            terminal.println("--> " + file);
-        }
-        terminal.println("");
-        if (batchMode == false) {
-            String text = terminal.readText("Continue and DELETE files? [y/N] ");
-            if (!text.equalsIgnoreCase("y")) {
-                throw new ElasticsearchException("aborted by user");
-            }
-        }
-    }
-
-    /** Return a Set of all files in a given directory */
-    public static Set<Path> filesInDirectory(Path directory) throws IOException {
-        Set<Path> files = new HashSet<>();
-        try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
-            for (Path file : stream) {
-                files.add(file);
-            }
-        }
-        return files;
-    }
-
-}

+ 2 - 18
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -23,7 +23,6 @@ import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.TermQuery;
@@ -2641,7 +2640,8 @@ public class IndexShardTests extends IndexShardTestCase {
 
         final ShardPath shardPath = indexShard.shardPath();
 
-        final Path indexPath = corruptIndexFile(shardPath);
+        final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
+        CorruptionUtils.corruptIndex(random(), indexPath, false);
 
         final AtomicInteger corruptedMarkerCount = new AtomicInteger();
         final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@@ -2750,22 +2750,6 @@ public class IndexShardTests extends IndexShardTestCase {
         assertThat("store still has a single corrupt marker", corruptedMarkerCount.get(), equalTo(1));
     }
 
-    private Path corruptIndexFile(ShardPath shardPath) throws IOException {
-        final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
-        final Path[] filesToCorrupt =
-            Files.walk(indexPath)
-                .filter(p -> {
-                    final String name = p.getFileName().toString();
-                    return Files.isRegularFile(p)
-                        && name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS
-                        && IndexWriter.WRITE_LOCK_NAME.equals(name) == false
-                        && name.startsWith("segments_") == false && name.endsWith(".si") == false;
-                })
-                .toArray(Path[]::new);
-        CorruptionUtils.corruptFile(random(), filesToCorrupt);
-        return indexPath;
-    }
-
     /**
      * Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
      * and checking index concurrently. This should always be possible without any exception.

+ 652 - 0
server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java

@@ -0,0 +1,652 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.NativeFSLockFactory;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
+import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.cli.MockTerminal;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.GroupShardsIterator;
+import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
+import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.env.TestEnvironment;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.MergePolicyConfig;
+import org.elasticsearch.index.MockEngineFactoryPlugin;
+import org.elasticsearch.index.seqno.SeqNoStats;
+import org.elasticsearch.index.translog.TestTranslog;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.monitor.fs.FsInfo;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.CorruptionUtils;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalSettingsPlugin;
+import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.engine.MockEngineSupport;
+import org.elasticsearch.test.transport.MockTransportService;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.startsWith;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
+public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
+    }
+
+    public void testCorruptIndex() throws Exception {
+        final String node = internalCluster().startNode();
+
+        final String indexName = "index42";
+        assertAcked(prepareCreate(indexName).setSettings(Settings.builder()
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
+            .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
+            .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true)
+            .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum")
+        ));
+
+        // index some docs in several segments
+        int numDocs = 0;
+        for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) {
+            final int numExtraDocs = between(10, 100);
+            IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs];
+            for (int i = 0; i < builders.length; i++) {
+                builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar");
+            }
+
+            numDocs += numExtraDocs;
+
+            indexRandom(false, false, false, Arrays.asList(builders));
+            flush(indexName);
+        }
+
+        logger.info("--> indexed {} docs", numDocs);
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final MockTerminal terminal = new MockTerminal();
+        final OptionParser parser = command.getParser();
+
+        final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings());
+        final OptionSet options = parser.parse("-index", indexName, "-shard-id", "0");
+
+        // Try running it before the node is stopped (and shard is closed)
+        try {
+            command.execute(terminal, options, environment);
+            fail("expected the command to fail as node is locked");
+        } catch (Exception e) {
+            assertThat(e.getMessage(),
+                allOf(containsString("Failed to lock node's directory"),
+                    containsString("is Elasticsearch still running ?")));
+        }
+
+        final Set<Path> indexDirs = getDirs(indexName, ShardPath.INDEX_FOLDER_NAME);
+        assertThat(indexDirs, hasSize(1));
+
+        internalCluster().restartNode(node, new InternalTestCluster.RestartCallback() {
+            @Override
+            public Settings onNodeStopped(String nodeName) throws Exception {
+                // Try running it before the shard is corrupted, it should flip out because there is no corruption file marker
+                try {
+                    command.execute(terminal, options, environment);
+                    fail("expected the command to fail as there is no corruption file marker");
+                } catch (Exception e) {
+                    assertThat(e.getMessage(), startsWith("Shard does not seem to be corrupted at"));
+                }
+
+                CorruptionUtils.corruptIndex(random(), indexDirs.iterator().next(), false);
+                return super.onNodeStopped(nodeName);
+            }
+        });
+
+        // shard should be failed due to a corrupted index
+        assertBusy(() -> {
+            final ClusterAllocationExplanation explanation =
+                client().admin().cluster().prepareAllocationExplain()
+                    .setIndex(indexName).setShard(0).setPrimary(true)
+                    .get().getExplanation();
+
+            final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
+            assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
+            assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
+                equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
+        });
+
+        internalCluster().restartNode(node, new InternalTestCluster.RestartCallback() {
+            @Override
+            public Settings onNodeStopped(String nodeName) throws Exception {
+                terminal.addTextInput("y");
+                command.execute(terminal, options, environment);
+
+                return super.onNodeStopped(nodeName);
+            }
+        });
+
+        waitNoPendingTasksOnAll();
+
+        String nodeId = null;
+        final ClusterState state = client().admin().cluster().prepareState().get().getState();
+        final DiscoveryNodes nodes = state.nodes();
+        for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getNodes()) {
+            final String name = cursor.value.getName();
+            if (name.equals(node)) {
+                nodeId = cursor.key;
+                break;
+            }
+        }
+        assertThat(nodeId, notNullValue());
+
+        logger.info("--> output:\n{}", terminal.getOutput());
+
+        assertThat(terminal.getOutput(), containsString("allocate_stale_primary"));
+        assertThat(terminal.getOutput(), containsString("\"node\" : \"" + nodeId + "\""));
+
+        // there is only _stale_ primary (due to new allocation id)
+        assertBusy(() -> {
+            final ClusterAllocationExplanation explanation =
+                client().admin().cluster().prepareAllocationExplain()
+                    .setIndex(indexName).setShard(0).setPrimary(true)
+                    .get().getExplanation();
+
+            final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
+            assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
+            assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
+                equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
+        });
+
+        client().admin().cluster().prepareReroute()
+            .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, nodeId, true))
+            .get();
+
+        assertBusy(() -> {
+            final ClusterAllocationExplanation explanation =
+                client().admin().cluster().prepareAllocationExplain()
+                    .setIndex(indexName).setShard(0).setPrimary(true)
+                    .get().getExplanation();
+
+            assertThat(explanation.getCurrentNode(), notNullValue());
+            assertThat(explanation.getShardState(), equalTo(ShardRoutingState.STARTED));
+        });
+
+        final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?<docs>\\d+) documents will be lost.");
+        final Matcher matcher = pattern.matcher(terminal.getOutput());
+        assertThat(matcher.find(), equalTo(true));
+        final int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs"));
+
+        ensureGreen(indexName);
+
+        assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs);
+    }
+
+    public void testCorruptTranslogTruncation() throws Exception {
+        internalCluster().startNodes(2, Settings.EMPTY);
+
+        final String node1 = internalCluster().getNodeNames()[0];
+        final String node2 = internalCluster().getNodeNames()[1];
+
+        final String indexName = "test";
+        assertAcked(prepareCreate(indexName).setSettings(Settings.builder()
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+            .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
+            .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
+            .put("index.routing.allocation.exclude._name", node2)
+        ));
+        ensureYellow();
+
+        assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder()
+            .put("index.routing.allocation.exclude._name", (String)null)
+        ));
+        ensureGreen();
+
+        // Index some documents
+        int numDocsToKeep = randomIntBetween(10, 100);
+        logger.info("--> indexing [{}] docs to be kept", numDocsToKeep);
+        IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep];
+        for (int i = 0; i < builders.length; i++) {
+            builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar");
+        }
+        indexRandom(false, false, false, Arrays.asList(builders));
+        flush(indexName);
+
+        disableTranslogFlush(indexName);
+        // having no extra docs is an interesting case for seq no based recoveries - test it more often
+        int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100);
+        logger.info("--> indexing [{}] more doc to be truncated", numDocsToTruncate);
+        builders = new IndexRequestBuilder[numDocsToTruncate];
+        for (int i = 0; i < builders.length; i++) {
+            builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar");
+        }
+        indexRandom(false, false, false, Arrays.asList(builders));
+        Set<Path> translogDirs = getDirs(indexName, ShardPath.TRANSLOG_FOLDER_NAME);
+
+        // that's only for 6.x branch for bwc with elasticsearch-translog
+        final boolean translogOnly = randomBoolean();
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(translogOnly);
+        final MockTerminal terminal = new MockTerminal();
+        final OptionParser parser = command.getParser();
+
+        if (randomBoolean() && numDocsToTruncate > 0) {
+            // flush the replica, so it will have more docs than what the primary will have
+            Index index = resolveIndex(indexName);
+            IndexShard replica = internalCluster().getInstance(IndicesService.class, node2).getShardOrNull(new ShardId(index, 0));
+            replica.flush(new FlushRequest());
+            logger.info("--> performed extra flushing on replica");
+        }
+
+        // shut down the replica node to be tested later
+        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2));
+
+        // Corrupt the translog file(s)
+        logger.info("--> corrupting translog");
+        corruptRandomTranslogFiles(indexName);
+
+        // Restart the single node
+        logger.info("--> restarting node");
+        internalCluster().restartRandomDataNode();
+
+        // all shards should be failed due to a corrupted translog
+        assertBusy(() -> {
+            final ClusterAllocationExplanation explanation =
+                client().admin().cluster().prepareAllocationExplain()
+                    .setIndex(indexName).setShard(0).setPrimary(true)
+                    .get().getExplanation();
+
+            final UnassignedInfo unassignedInfo = explanation.getUnassignedInfo();
+            assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
+        });
+
+        // have to shut down primary node - otherwise node lock is present
+        final InternalTestCluster.RestartCallback callback =
+            new InternalTestCluster.RestartCallback() {
+                @Override
+                public Settings onNodeStopped(String nodeName) throws Exception {
+                    // and we can actually truncate the translog
+                    for (Path translogDir : translogDirs) {
+                        final Path idxLocation = translogDir.getParent().resolve(ShardPath.INDEX_FOLDER_NAME);
+                        assertBusy(() -> {
+                            logger.info("--> checking that lock has been released for {}", idxLocation);
+                            try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
+                                 Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
+                                // Great, do nothing, we just wanted to obtain the lock
+                            } catch (LockObtainFailedException lofe) {
+                                logger.info("--> failed acquiring lock for {}", idxLocation);
+                                fail("still waiting for lock release at [" + idxLocation + "]");
+                            } catch (IOException ioe) {
+                                fail("Got an IOException: " + ioe);
+                            }
+                        });
+
+                        final Settings defaultSettings = internalCluster().getDefaultSettings();
+                        final Environment environment = TestEnvironment.newEnvironment(defaultSettings);
+
+                        terminal.addTextInput("y");
+                        OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString());
+                        logger.info("--> running command for [{}]", translogDir.toAbsolutePath());
+                        command.execute(terminal, options, environment);
+                        logger.info("--> output:\n{}", terminal.getOutput());
+                    }
+
+                    return super.onNodeStopped(nodeName);
+                }
+            };
+        internalCluster().restartNode(node1, callback);
+
+        String primaryNodeId = null;
+        final ClusterState state = client().admin().cluster().prepareState().get().getState();
+        final DiscoveryNodes nodes = state.nodes();
+        for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getNodes()) {
+            final String name = cursor.value.getName();
+            if (name.equals(node1)) {
+                primaryNodeId = cursor.key;
+                break;
+            }
+        }
+        assertThat(primaryNodeId, notNullValue());
+
+        assertThat(terminal.getOutput(), containsString("allocate_stale_primary"));
+        assertThat(terminal.getOutput(), containsString("\"node\" : \"" + primaryNodeId + "\""));
+
+        // there is only _stale_ primary (due to new allocation id)
+        assertBusy(() -> {
+            final ClusterAllocationExplanation explanation =
+                client().admin().cluster().prepareAllocationExplain()
+                    .setIndex(indexName).setShard(0).setPrimary(true)
+                    .get().getExplanation();
+
+            final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
+            assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
+            assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
+                equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
+        });
+
+        client().admin().cluster().prepareReroute()
+            .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true))
+            .get();
+
+        assertBusy(() -> {
+            final ClusterAllocationExplanation explanation =
+                client().admin().cluster().prepareAllocationExplain()
+                    .setIndex(indexName).setShard(0).setPrimary(true)
+                    .get().getExplanation();
+
+            assertThat(explanation.getCurrentNode(), notNullValue());
+            assertThat(explanation.getShardState(), equalTo(ShardRoutingState.STARTED));
+        });
+
+        ensureYellow(indexName);
+
+        // Run a search and make sure it succeeds
+        assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocsToKeep);
+
+        logger.info("--> starting the replica node to test recovery");
+        internalCluster().startNode();
+        ensureGreen(indexName);
+        for (String node : internalCluster().nodesInclude(indexName)) {
+            SearchRequestBuilder q = client().prepareSearch(indexName).setPreference("_only_nodes:" + node).setQuery(matchAllQuery());
+            assertHitCount(q.get(), numDocsToKeep);
+        }
+        final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).setActiveOnly(false).get();
+        final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get(indexName).stream()
+            .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
+        assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
+        // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
+        final SeqNoStats seqNoStats = getSeqNoStats(indexName, 0);
+        assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
+        assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
+    }
+
+    public void testCorruptTranslogTruncationOfReplica() throws Exception {
+        internalCluster().startNodes(2, Settings.EMPTY);
+
+        final String node1 = internalCluster().getNodeNames()[0];
+        final String node2 = internalCluster().getNodeNames()[1];
+        logger.info("--> nodes name: {}, {}", node1, node2);
+
+        final String indexName = "test";
+        assertAcked(prepareCreate(indexName).setSettings(Settings.builder()
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+            .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
+            .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
+            .put("index.routing.allocation.exclude._name", node2)
+        ));
+        ensureYellow();
+
+        assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder()
+            .put("index.routing.allocation.exclude._name", (String)null)
+        ));
+        ensureGreen();
+
+        // Index some documents
+        int numDocsToKeep = randomIntBetween(0, 100);
+        logger.info("--> indexing [{}] docs to be kept", numDocsToKeep);
+        IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep];
+        for (int i = 0; i < builders.length; i++) {
+            builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar");
+        }
+        indexRandom(false, false, false, Arrays.asList(builders));
+        flush(indexName);
+        disableTranslogFlush(indexName);
+        // having no extra docs is an interesting case for seq no based recoveries - test it more often
+        int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100);
+        logger.info("--> indexing [{}] more docs to be truncated", numDocsToTruncate);
+        builders = new IndexRequestBuilder[numDocsToTruncate];
+        for (int i = 0; i < builders.length; i++) {
+            builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar");
+        }
+        indexRandom(false, false, false, Arrays.asList(builders));
+        final int totalDocs = numDocsToKeep + numDocsToTruncate;
+
+        // sample the replica node translog dirs
+        final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
+        final Set<Path> translogDirs = getDirs(node2, shardId, ShardPath.TRANSLOG_FOLDER_NAME);
+
+        // stop the cluster nodes. we don't use full restart so the node start up order will be the same
+        // and shard roles will be maintained
+        internalCluster().stopRandomDataNode();
+        internalCluster().stopRandomDataNode();
+
+        // Corrupt the translog file(s)
+        logger.info("--> corrupting translog");
+        TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs);
+
+        // Restart the single node
+        logger.info("--> starting node");
+        internalCluster().startNode();
+
+        ensureYellow();
+
+        // Run a search and make sure it succeeds
+        assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), totalDocs);
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final MockTerminal terminal = new MockTerminal();
+        final OptionParser parser = command.getParser();
+
+        final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings());
+
+        internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
+            @Override
+            public Settings onNodeStopped(String nodeName) throws Exception {
+                logger.info("--> node {} stopped", nodeName);
+                for (Path translogDir : translogDirs) {
+                    final Path idxLocation = translogDir.getParent().resolve(ShardPath.INDEX_FOLDER_NAME);
+                    assertBusy(() -> {
+                        logger.info("--> checking that lock has been released for {}", idxLocation);
+                        try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
+                             Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
+                            // Great, do nothing, we just wanted to obtain the lock
+                        }  catch (LockObtainFailedException lofe) {
+                            logger.info("--> failed acquiring lock for {}", idxLocation);
+                            fail("still waiting for lock release at [" + idxLocation + "]");
+                        } catch (IOException ioe) {
+                            fail("Got an IOException: " + ioe);
+                        }
+                    });
+
+                    terminal.addTextInput("y");
+                    OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString());
+                    logger.info("--> running command for [{}]", translogDir.toAbsolutePath());
+                    command.execute(terminal, options, environment);
+                    logger.info("--> output:\n{}", terminal.getOutput());
+                }
+
+                return super.onNodeStopped(nodeName);
+            }
+        });
+
+        logger.info("--> starting the replica node to test recovery");
+        internalCluster().startNode();
+        ensureGreen(indexName);
+        for (String node : internalCluster().nodesInclude(indexName)) {
+            assertHitCount(client().prepareSearch(indexName)
+                .setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(), totalDocs);
+        }
+
+        final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).setActiveOnly(false).get();
+        final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get(indexName).stream()
+            .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
+        // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
+        assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
+        // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
+        final SeqNoStats seqNoStats = getSeqNoStats(indexName, 0);
+        assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
+        assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
+    }
+
+    public void testResolvePath() throws Exception {
+        final int numOfNodes = randomIntBetween(1, 5);
+        final List<String> nodeNames = internalCluster().startNodes(numOfNodes, Settings.EMPTY);
+
+        final String indexName = "test" + randomInt(100);
+        assertAcked(prepareCreate(indexName).setSettings(Settings.builder()
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfNodes - 1)
+        ));
+        flush(indexName);
+
+        ensureGreen(indexName);
+
+        final Map<String, String> nodeNameToNodeId = new HashMap<>();
+        final ClusterState state = client().admin().cluster().prepareState().get().getState();
+        final DiscoveryNodes nodes = state.nodes();
+        for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getNodes()) {
+            nodeNameToNodeId.put(cursor.value.getName(), cursor.key);
+        }
+
+        final GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false);
+        final List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
+        final ShardRouting shardRouting = iterators.iterator().next().nextOrNull();
+        assertThat(shardRouting, notNullValue());
+        final ShardId shardId = shardRouting.shardId();
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final OptionParser parser = command.getParser();
+
+        final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings());
+
+        final Map<String, Path> indexPathByNodeName = new HashMap<>();
+        for (String nodeName : nodeNames) {
+            final String nodeId = nodeNameToNodeId.get(nodeName);
+            final Set<Path> indexDirs = getDirs(nodeId, shardId, ShardPath.INDEX_FOLDER_NAME);
+            assertThat(indexDirs, hasSize(1));
+            indexPathByNodeName.put(nodeName, indexDirs.iterator().next());
+
+            internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeName));
+            logger.info(" -- stopped {}", nodeName);
+        }
+
+        for (String nodeName : nodeNames) {
+            final Path indexPath = indexPathByNodeName.get(nodeName);
+            final OptionSet options = parser.parse("--dir", indexPath.toAbsolutePath().toString());
+            command.findAndProcessShardPath(options, environment,
+                shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath)));
+        }
+    }
+
+    private Set<Path> getDirs(String indexName, String dirSuffix) {
+        ClusterState state = client().admin().cluster().prepareState().get().getState();
+        GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false);
+        List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
+        ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
+        ShardRouting shardRouting = shardIterator.nextOrNull();
+        assertNotNull(shardRouting);
+        assertTrue(shardRouting.primary());
+        assertTrue(shardRouting.assignedToNode());
+        String nodeId = shardRouting.currentNodeId();
+        ShardId shardId = shardRouting.shardId();
+        return getDirs(nodeId, shardId, dirSuffix);
+    }
+
+    private Set<Path> getDirs(String nodeId, ShardId shardId, String dirSuffix) {
+        final NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
+        final Set<Path> translogDirs = new TreeSet<>();
+        final NodeStats nodeStats = nodeStatses.getNodes().get(0);
+        for (FsInfo.Path fsPath : nodeStats.getFs()) {
+            final String path = fsPath.getPath();
+            final Path p = PathUtils.get(path)
+                .resolve(NodeEnvironment.INDICES_FOLDER)
+                .resolve(shardId.getIndex().getUUID())
+                .resolve(Integer.toString(shardId.getId()))
+                .resolve(dirSuffix);
+            if (Files.isDirectory(p)) {
+                translogDirs.add(p);
+            }
+        }
+        return translogDirs;
+    }
+
+    private void corruptRandomTranslogFiles(String indexName) throws IOException {
+        Set<Path> translogDirs = getDirs(indexName, ShardPath.TRANSLOG_FOLDER_NAME);
+        TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs);
+    }
+
+    /** Disables translog flushing for the specified index */
+    private static void disableTranslogFlush(String index) {
+        Settings settings = Settings.builder()
+            .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB))
+            .build();
+        client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
+    }
+
+    private SeqNoStats getSeqNoStats(String index, int shardId) {
+        final ShardStats[] shardStats = client().admin().indices()
+            .prepareStats(index).get()
+            .getIndices().get(index).getShards();
+        return shardStats[shardId].getSeqNoStats();
+    }
+}

+ 409 - 0
server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java

@@ -0,0 +1,409 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import org.apache.lucene.store.BaseDirectoryWrapper;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.Version;
+import org.elasticsearch.cli.MockTerminal;
+import org.elasticsearch.cli.Terminal;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingHelper;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.CheckedFunction;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.env.TestEnvironment;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.MergePolicyConfig;
+import org.elasticsearch.index.engine.EngineException;
+import org.elasticsearch.index.engine.InternalEngineFactory;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.TestTranslog;
+import org.elasticsearch.index.translog.TranslogCorruptedException;
+import org.elasticsearch.test.CorruptionUtils;
+import org.elasticsearch.test.DummyShardLock;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
+
+public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
+
+    private ShardId shardId;
+    private ShardRouting routing;
+    private Path dataDir;
+    private Environment environment;
+    private Settings settings;
+    private ShardPath shardPath;
+    private IndexMetaData indexMetaData;
+    private IndexShard indexShard;
+    private Path translogPath;
+    private Path indexPath;
+
+    @Before
+    public void setup() throws IOException {
+        shardId = new ShardId("index0", "_na_", 0);
+        final String nodeId = randomAlphaOfLength(10);
+        routing = TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.INITIALIZING,
+            RecoverySource.EmptyStoreRecoverySource.INSTANCE);
+
+        dataDir = createTempDir();
+
+        environment =
+            TestEnvironment.newEnvironment(Settings.builder()
+                .put(Environment.PATH_HOME_SETTING.getKey(), dataDir)
+                .putList(Environment.PATH_DATA_SETTING.getKey(), dataDir.toAbsolutePath().toString()).build());
+
+        // create same directory structure as prod does
+        final Path path = NodeEnvironment.resolveNodePath(dataDir, 0);
+        Files.createDirectories(path);
+        settings = Settings.builder()
+            .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+            .build();
+
+        final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(path);
+        shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
+        final IndexMetaData.Builder metaData = IndexMetaData.builder(routing.getIndexName())
+            .settings(settings)
+            .primaryTerm(0, randomIntBetween(1, 100))
+            .putMapping("_doc", "{ \"properties\": {} }");
+        indexMetaData = metaData.build();
+
+        indexShard = newStartedShard(p ->
+                newShard(routing, shardPath, indexMetaData, null, null,
+                    new InternalEngineFactory(), () -> {
+                    }, EMPTY_EVENT_LISTENER),
+            true);
+
+        translogPath = shardPath.resolveTranslog();
+        indexPath = shardPath.resolveIndex();
+    }
+
+    public void testShardLock() throws Exception {
+        indexDocs(indexShard, true);
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final MockTerminal t = new MockTerminal();
+        final OptionParser parser = command.getParser();
+
+        // Try running it before the shard is closed, it should flip out because it can't acquire the lock
+        try {
+            final OptionSet options = parser.parse("-d", indexPath.toString());
+            command.execute(t, options, environment);
+            fail("expected the command to fail not being able to acquire the lock");
+        } catch (Exception e) {
+            assertThat(e.getMessage(), containsString("Failed to lock shard's directory"));
+        }
+
+        // close shard
+        closeShards(indexShard);
+
+        // Try running it before the shard is corrupted
+        try {
+            final OptionSet options = parser.parse("-d", indexPath.toString());
+            command.execute(t, options, environment);
+            fail("expected the command to fail not being able to find a corrupt file marker");
+        } catch (ElasticsearchException e) {
+            assertThat(e.getMessage(), startsWith("Shard does not seem to be corrupted at"));
+            assertThat(t.getOutput(), containsString("Lucene index is clean at"));
+        }
+    }
+
+    public void testCorruptedIndex() throws Exception {
+        final int numDocs = indexDocs(indexShard, true);
+
+        // close shard
+        closeShards(indexShard);
+
+        final boolean corruptSegments = randomBoolean();
+        CorruptionUtils.corruptIndex(random(), indexPath, corruptSegments);
+
+        // test corrupted shard
+        final IndexShard corruptedShard = reopenIndexShard(true);
+        allowShardFailures();
+        expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
+        closeShards(corruptedShard);
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final MockTerminal t = new MockTerminal();
+        final OptionParser parser = command.getParser();
+
+        // run command with dry-run
+        t.addTextInput("n"); // mean dry run
+        final OptionSet options = parser.parse("-d", indexPath.toString());
+        t.setVerbosity(Terminal.Verbosity.VERBOSE);
+        try {
+            command.execute(t, options, environment);
+            fail();
+        } catch (ElasticsearchException e) {
+            if (corruptSegments) {
+                assertThat(e.getMessage(), is("Index is unrecoverable"));
+            } else {
+                assertThat(e.getMessage(), containsString("aborted by user"));
+            }
+        }
+
+        logger.info("--> output:\n{}", t.getOutput());
+
+        if (corruptSegments == false) {
+
+            // run command without dry-run
+            t.addTextInput("y");
+            command.execute(t, options, environment);
+
+            final String output = t.getOutput();
+            logger.info("--> output:\n{}", output);
+
+            // reopen shard
+            failOnShardFailures();
+            final IndexShard newShard = newStartedShard(p -> reopenIndexShard(false), true);
+
+            final Set<String> shardDocUIDs = getShardDocUIDs(newShard);
+
+            final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?<docs>\\d+) documents will be lost.");
+            final Matcher matcher = pattern.matcher(output);
+            assertThat(matcher.find(), equalTo(true));
+            final int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs"));
+
+            assertThat(shardDocUIDs.size(), equalTo(expectedNumDocs));
+
+            closeShards(newShard);
+        }
+    }
+
+    public void testCorruptedTranslog() throws Exception {
+        final int numDocsToKeep = indexDocs(indexShard, false);
+
+        // close shard
+        closeShards(indexShard);
+
+        TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
+
+        // test corrupted shard
+        final IndexShard corruptedShard = reopenIndexShard(true);
+
+        allowShardFailures();
+        // it has to fail on start up due to index.shard.check_on_startup = checksum
+        final Exception exception = expectThrows(Exception.class, () -> newStartedShard(p -> corruptedShard, true));
+        final Throwable cause = exception.getCause() instanceof EngineException ? exception.getCause().getCause() : exception.getCause();
+        assertThat(cause, instanceOf(TranslogCorruptedException.class));
+
+        closeShards(corruptedShard);
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final MockTerminal t = new MockTerminal();
+        final OptionParser parser = command.getParser();
+
+        final OptionSet options = parser.parse("-d", translogPath.toString());
+        // run command with dry-run
+        t.addTextInput("n"); // mean dry run
+        t.setVerbosity(Terminal.Verbosity.VERBOSE);
+        try {
+            command.execute(t, options, environment);
+            fail();
+        } catch (ElasticsearchException e) {
+            assertThat(e.getMessage(), containsString("aborted by user"));
+            assertThat(t.getOutput(), containsString("Continue and remove corrupted data from the shard ?"));
+        }
+
+        logger.info("--> output:\n{}", t.getOutput());
+
+        // run command without dry-run
+        t.reset();
+        t.addTextInput("y");
+        command.execute(t, options, environment);
+
+        final String output = t.getOutput();
+        logger.info("--> output:\n{}", output);
+
+        // reopen shard
+        failOnShardFailures();
+        final IndexShard newShard = newStartedShard(p -> reopenIndexShard(false), true);
+
+        final Set<String> shardDocUIDs = getShardDocUIDs(newShard);
+
+        assertThat(shardDocUIDs.size(), equalTo(numDocsToKeep));
+
+        closeShards(newShard);
+    }
+
+    public void testCorruptedBothIndexAndTranslog() throws Exception {
+        // index some docs in several segments
+        final int numDocsToKeep = indexDocs(indexShard, false);
+
+        // close shard
+        closeShards(indexShard);
+
+        CorruptionUtils.corruptIndex(random(), indexPath, false);
+
+        // test corrupted shard
+        final IndexShard corruptedShard = reopenIndexShard(true);
+        allowShardFailures();
+        expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
+        closeShards(corruptedShard);
+
+        TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final MockTerminal t = new MockTerminal();
+        final OptionParser parser = command.getParser();
+
+        final OptionSet options = parser.parse("-d", translogPath.toString());
+        // run command with dry-run
+        t.addTextInput("n"); // mean dry run
+        t.addTextInput("n"); // mean dry run
+        t.setVerbosity(Terminal.Verbosity.VERBOSE);
+        try {
+            command.execute(t, options, environment);
+            fail();
+        } catch (ElasticsearchException e) {
+            assertThat(e.getMessage(), containsString("aborted by user"));
+            assertThat(t.getOutput(), containsString("Continue and remove corrupted data from the shard ?"));
+        }
+
+        logger.info("--> output:\n{}", t.getOutput());
+
+        // run command without dry-run
+        t.reset();
+        t.addTextInput("y");
+        command.execute(t, options, environment);
+
+        final String output = t.getOutput();
+        logger.info("--> output:\n{}", output);
+
+        // reopen shard
+        failOnShardFailures();
+        final IndexShard newShard = newStartedShard(p -> reopenIndexShard(false), true);
+
+        final Set<String> shardDocUIDs = getShardDocUIDs(newShard);
+
+        final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?<docs>\\d+) documents will be lost.");
+        final Matcher matcher = pattern.matcher(output);
+        assertThat(matcher.find(), equalTo(true));
+        final int expectedNumDocs = numDocsToKeep - Integer.parseInt(matcher.group("docs"));
+
+        assertThat(shardDocUIDs.size(), equalTo(expectedNumDocs));
+
+        closeShards(newShard);
+    }
+
+    public void testResolveIndexDirectory() throws Exception {
+        // index a single doc to have files on a disk
+        indexDoc(indexShard, "_doc", "0", "{}");
+        flushShard(indexShard, true);
+        writeIndexState();
+
+        // close shard
+        closeShards(indexShard);
+
+        final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
+        final OptionParser parser = command.getParser();
+
+        // `--index index_name --shard-id 0` has to be resolved to indexPath
+        final OptionSet options = parser.parse("--index", shardId.getIndex().getName(),
+            "--shard-id", Integer.toString(shardId.id()));
+
+        command.findAndProcessShardPath(options, environment,
+            shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath)));
+
+        final OptionSet options2 = parser.parse("--dir", indexPath.toAbsolutePath().toString());
+        command.findAndProcessShardPath(options2, environment,
+            shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath)));
+    }
+
+    private IndexShard reopenIndexShard(boolean corrupted) throws IOException {
+        // open shard with the same location
+        final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
+            RecoverySource.ExistingStoreRecoverySource.INSTANCE
+        );
+
+        final IndexMetaData metaData = IndexMetaData.builder(indexMetaData)
+            .settings(Settings.builder()
+                .put(indexShard.indexSettings().getSettings())
+                .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))
+            .build();
+
+        CheckedFunction<IndexSettings, Store, IOException> storeProvider =
+            corrupted == false ? null :
+                indexSettings -> {
+                    final ShardId shardId = shardPath.getShardId();
+                    final BaseDirectoryWrapper baseDirectoryWrapper = newFSDirectory(shardPath.resolveIndex());
+                    // index is corrupted - don't even try to check index on close - it fails
+                    baseDirectoryWrapper.setCheckIndexOnClose(false);
+                    return new Store(shardId, indexSettings, baseDirectoryWrapper, new DummyShardLock(shardId));
+        };
+
+        return newShard(shardRouting, shardPath, metaData, storeProvider, null,
+            indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
+    }
+
+    private int indexDocs(IndexShard indexShard, boolean flushLast) throws IOException {
+        // index some docs in several segments
+        int numDocs = 0;
+        int numDocsToKeep = 0;
+        for (int i = 0, attempts = randomIntBetween(5, 10); i < attempts; i++) {
+            final int numExtraDocs = between(10, 100);
+            for (long j = 0; j < numExtraDocs; j++) {
+                indexDoc(indexShard, "_doc", Long.toString(numDocs + j), "{}");
+            }
+            numDocs += numExtraDocs;
+
+            if (flushLast || i < attempts - 1) {
+                numDocsToKeep += numExtraDocs;
+                flushShard(indexShard, true);
+            }
+        }
+
+        logger.info("--> indexed {} docs, {} to keep", numDocs, numDocsToKeep);
+
+        writeIndexState();
+        return numDocsToKeep;
+    }
+
+    private void writeIndexState() throws IOException {
+        // create _state of IndexMetaData
+        try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment, nId -> {})) {
+            final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex());
+            IndexMetaData.FORMAT.write(indexMetaData, paths);
+            logger.info("--> index metadata persisted to {} ", Arrays.toString(paths));
+        }
+    }
+
+}

+ 1 - 1
server/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java

@@ -125,7 +125,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
             }
         }
         Path translogDir = RandomPicks.randomFrom(random(), translogDirs);
-        TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, TestTranslog.minTranslogGenUsedInRecovery(translogDir));
+        TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogDir));
     }
 
     /** Disables translog flushing for the specified index */

+ 9 - 2
server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java

@@ -34,6 +34,7 @@ import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -52,13 +53,19 @@ import static org.hamcrest.core.IsNot.not;
 public class TestTranslog {
     static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");
 
+    public static void corruptRandomTranslogFile(Logger logger, Random random, Collection<Path> translogDirs) throws IOException {
+        for (Path translogDir : translogDirs) {
+            final long minTranslogGen = minTranslogGenUsedInRecovery(translogDir);
+            corruptRandomTranslogFile(logger, random, translogDir, minTranslogGen);
+        }
+    }
+
     /**
      * Corrupts random translog file (translog-N.tlog) from the given translog directory.
      *
      * @return a translog file which has been corrupted.
      */
-    public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws
-            IOException {
+    public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException {
         Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
         logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {

+ 0 - 382
server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java

@@ -1,382 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.translog;
-
-import com.carrotsearch.randomizedtesting.generators.RandomPicks;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockObtainFailedException;
-import org.apache.lucene.store.NativeFSLockFactory;
-import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
-import org.elasticsearch.action.admin.indices.flush.FlushRequest;
-import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
-import org.elasticsearch.action.admin.indices.stats.ShardStats;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.cli.MockTerminal;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.routing.GroupShardsIterator;
-import org.elasticsearch.cluster.routing.ShardIterator;
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.io.PathUtils;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.Index;
-import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.MockEngineFactoryPlugin;
-import org.elasticsearch.index.seqno.SeqNoStats;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.indices.recovery.RecoveryState;
-import org.elasticsearch.monitor.fs.FsInfo;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.test.engine.MockEngineSupport;
-import org.elasticsearch.test.transport.MockTransportService;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
-import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
-public class TruncateTranslogIT extends ESIntegTestCase {
-
-    @Override
-    protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class);
-    }
-
-    public void testCorruptTranslogTruncation() throws Exception {
-        internalCluster().startNodes(2, Settings.EMPTY);
-
-        final String replicaNode = internalCluster().getNodeNames()[1];
-
-            assertAcked(prepareCreate("test").setSettings(Settings.builder()
-            .put("index.number_of_shards", 1)
-            .put("index.number_of_replicas", 1)
-            .put("index.refresh_interval", "-1")
-            .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
-            .put("index.routing.allocation.exclude._name", replicaNode)
-        ));
-        ensureYellow();
-
-        assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
-            .put("index.routing.allocation.exclude._name", (String)null)
-        ));
-        ensureGreen();
-
-        // Index some documents
-        int numDocsToKeep = randomIntBetween(0, 100);
-        logger.info("--> indexing [{}] docs to be kept", numDocsToKeep);
-        IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep];
-        for (int i = 0; i < builders.length; i++) {
-            builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
-        }
-        indexRandom(false, false, false, Arrays.asList(builders));
-        flush("test");
-        disableTranslogFlush("test");
-        // having no extra docs is an interesting case for seq no based recoveries - test it more often
-        int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100);
-        logger.info("--> indexing [{}] more doc to be truncated", numDocsToTruncate);
-        builders = new IndexRequestBuilder[numDocsToTruncate];
-        for (int i = 0; i < builders.length; i++) {
-            builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
-        }
-        indexRandom(false, false, false, Arrays.asList(builders));
-        Set<Path> translogDirs = getTranslogDirs("test");
-
-        TruncateTranslogCommand ttc = new TruncateTranslogCommand();
-        MockTerminal t = new MockTerminal();
-        OptionParser parser = ttc.getParser();
-
-        for (Path translogDir : translogDirs) {
-            OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b");
-            // Try running it before the shard is closed, it should flip out because it can't acquire the lock
-            try {
-                logger.info("--> running truncate while index is open on [{}]", translogDir.toAbsolutePath());
-                ttc.execute(t, options, null /* TODO: env should be real here, and ttc should actually use it... */);
-                fail("expected the truncate command to fail not being able to acquire the lock");
-            } catch (Exception e) {
-                assertThat(e.getMessage(), containsString("Failed to lock shard's directory"));
-            }
-        }
-
-        if (randomBoolean() && numDocsToTruncate > 0) {
-            // flush the replica, so it will have more docs than what the primary will have
-            Index index = resolveIndex("test");
-            IndexShard replica = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(new ShardId(index, 0));
-            replica.flush(new FlushRequest());
-            logger.info("--> performed extra flushing on replica");
-        }
-
-        // shut down the replica node to be tested later
-        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
-
-        // Corrupt the translog file
-        logger.info("--> corrupting translog");
-        corruptRandomTranslogFile("test");
-
-        // Restart the single node
-        logger.info("--> restarting node");
-        internalCluster().restartRandomDataNode();
-        client().admin().cluster().prepareHealth().setWaitForYellowStatus()
-            .setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS))
-            .setWaitForEvents(Priority.LANGUID)
-            .get();
-
-        try {
-            client().prepareSearch("test").setQuery(matchAllQuery()).get();
-            fail("all shards should be failed due to a corrupted translog");
-        } catch (SearchPhaseExecutionException e) {
-            // Good, all shards should be failed because there is only a
-            // single shard and its translog is corrupt
-        }
-
-        // Close the index so we can actually truncate the translog
-        logger.info("--> closing 'test' index");
-        client().admin().indices().prepareClose("test").get();
-
-        for (Path translogDir : translogDirs) {
-            final Path idxLocation = translogDir.getParent().resolve("index");
-            assertBusy(() -> {
-                logger.info("--> checking that lock has been released for {}", idxLocation);
-                try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
-                     Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
-                    // Great, do nothing, we just wanted to obtain the lock
-                }  catch (LockObtainFailedException lofe) {
-                    logger.info("--> failed acquiring lock for {}", idxLocation);
-                    fail("still waiting for lock release at [" + idxLocation + "]");
-                } catch (IOException ioe) {
-                    fail("Got an IOException: " + ioe);
-                }
-            });
-
-            OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b");
-            logger.info("--> running truncate translog command for [{}]", translogDir.toAbsolutePath());
-            ttc.execute(t, options, null /* TODO: env should be real here, and ttc should actually use it... */);
-            logger.info("--> output:\n{}", t.getOutput());
-        }
-
-        // Re-open index
-        logger.info("--> opening 'test' index");
-        client().admin().indices().prepareOpen("test").get();
-        ensureYellow("test");
-
-        // Run a search and make sure it succeeds
-        assertHitCount(client().prepareSearch("test").setQuery(matchAllQuery()).get(), numDocsToKeep);
-
-        logger.info("--> starting the replica node to test recovery");
-        internalCluster().startNode();
-        ensureGreen("test");
-        for (String node : internalCluster().nodesInclude("test")) {
-            SearchRequestBuilder q = client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery());
-            assertHitCount(q.get(), numDocsToKeep);
-        }
-        final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
-        final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
-            .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
-        assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
-        // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
-        final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
-        assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
-        assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
-    }
-
-    public void testCorruptTranslogTruncationOfReplica() throws Exception {
-        internalCluster().startNodes(2, Settings.EMPTY);
-
-        final String primaryNode = internalCluster().getNodeNames()[0];
-        final String replicaNode = internalCluster().getNodeNames()[1];
-
-        assertAcked(prepareCreate("test").setSettings(Settings.builder()
-            .put("index.number_of_shards", 1)
-            .put("index.number_of_replicas", 1)
-            .put("index.refresh_interval", "-1")
-            .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
-            .put("index.routing.allocation.exclude._name", replicaNode)
-        ));
-        ensureYellow();
-
-        assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
-            .put("index.routing.allocation.exclude._name", (String)null)
-        ));
-        ensureGreen();
-
-        // Index some documents
-        int numDocsToKeep = randomIntBetween(0, 100);
-        logger.info("--> indexing [{}] docs to be kept", numDocsToKeep);
-        IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep];
-        for (int i = 0; i < builders.length; i++) {
-            builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
-        }
-        indexRandom(false, false, false, Arrays.asList(builders));
-        flush("test");
-        disableTranslogFlush("test");
-        // having no extra docs is an interesting case for seq no based recoveries - test it more often
-        int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100);
-        logger.info("--> indexing [{}] more docs to be truncated", numDocsToTruncate);
-        builders = new IndexRequestBuilder[numDocsToTruncate];
-        for (int i = 0; i < builders.length; i++) {
-            builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
-        }
-        indexRandom(false, false, false, Arrays.asList(builders));
-        final int totalDocs = numDocsToKeep + numDocsToTruncate;
-
-
-        // sample the replica node translog dirs
-        final ShardId shardId = new ShardId(resolveIndex("test"), 0);
-        Set<Path> translogDirs = getTranslogDirs(replicaNode, shardId);
-        Path tdir = randomFrom(translogDirs);
-
-        // stop the cluster nodes. we don't use full restart so the node start up order will be the same
-        // and shard roles will be maintained
-        internalCluster().stopRandomDataNode();
-        internalCluster().stopRandomDataNode();
-
-        // Corrupt the translog file
-        logger.info("--> corrupting translog");
-        TestTranslog.corruptRandomTranslogFile(logger, random(), tdir, TestTranslog.minTranslogGenUsedInRecovery(tdir));
-
-        // Restart the single node
-        logger.info("--> starting node");
-        internalCluster().startNode();
-
-        ensureYellow();
-
-        // Run a search and make sure it succeeds
-        assertHitCount(client().prepareSearch("test").setQuery(matchAllQuery()).get(), totalDocs);
-
-        TruncateTranslogCommand ttc = new TruncateTranslogCommand();
-        MockTerminal t = new MockTerminal();
-        OptionParser parser = ttc.getParser();
-
-        for (Path translogDir : translogDirs) {
-            final Path idxLocation = translogDir.getParent().resolve("index");
-            assertBusy(() -> {
-                logger.info("--> checking that lock has been released for {}", idxLocation);
-                try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
-                     Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
-                    // Great, do nothing, we just wanted to obtain the lock
-                }  catch (LockObtainFailedException lofe) {
-                    logger.info("--> failed acquiring lock for {}", idxLocation);
-                    fail("still waiting for lock release at [" + idxLocation + "]");
-                } catch (IOException ioe) {
-                    fail("Got an IOException: " + ioe);
-                }
-            });
-
-            OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b");
-            logger.info("--> running truncate translog command for [{}]", translogDir.toAbsolutePath());
-            ttc.execute(t, options, null /* TODO: env should be real here, and ttc should actually use it... */);
-            logger.info("--> output:\n{}", t.getOutput());
-        }
-
-        logger.info("--> starting the replica node to test recovery");
-        internalCluster().startNode();
-        ensureGreen("test");
-        for (String node : internalCluster().nodesInclude("test")) {
-            assertHitCount(client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(), totalDocs);
-        }
-
-        final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
-        final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
-            .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
-        // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
-        assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
-        // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
-        final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
-        assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
-        assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
-    }
-
-    private Set<Path> getTranslogDirs(String indexName) throws IOException {
-        ClusterState state = client().admin().cluster().prepareState().get().getState();
-        GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false);
-        List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
-        ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
-        ShardRouting shardRouting = shardIterator.nextOrNull();
-        assertNotNull(shardRouting);
-        assertTrue(shardRouting.primary());
-        assertTrue(shardRouting.assignedToNode());
-        String nodeId = shardRouting.currentNodeId();
-        ShardId shardId = shardRouting.shardId();
-        return getTranslogDirs(nodeId, shardId);
-    }
-
-    private Set<Path> getTranslogDirs(String nodeId, ShardId shardId) {
-        NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
-        Set<Path> translogDirs = new TreeSet<>(); // treeset makes sure iteration order is deterministic
-        for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) {
-            String path = fsPath.getPath();
-            final String relativeDataLocationPath =  "indices/"+ shardId.getIndex().getUUID() +"/" + Integer.toString(shardId.getId())
-                + "/translog";
-            Path translogPath = PathUtils.get(path).resolve(relativeDataLocationPath);
-            if (Files.isDirectory(translogPath)) {
-                translogDirs.add(translogPath);
-            }
-        }
-        return translogDirs;
-    }
-
-    private void corruptRandomTranslogFile(String indexName) throws IOException {
-        Set<Path> translogDirs = getTranslogDirs(indexName);
-        Path translogDir = randomFrom(translogDirs);
-        TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, TestTranslog.minTranslogGenUsedInRecovery(translogDir));
-    }
-
-    /** Disables translog flushing for the specified index */
-    private static void disableTranslogFlush(String index) {
-        Settings settings = Settings.builder()
-                .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB))
-                .build();
-        client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
-    }
-
-    private SeqNoStats getSeqNoStats(String index, int shardId) {
-        final ShardStats[] shardStats = client().admin().indices()
-            .prepareStats(index).get()
-            .getIndices().get(index).getShards();
-        return shardStats[shardId].getSeqNoStats();
-    }
-}

+ 6 - 3
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -133,7 +133,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
         super.setUp();
         threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings());
         primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards
-        failOnShardFailures.set(true);
+        failOnShardFailures();
     }
 
     @Override
@@ -154,6 +154,10 @@ public abstract class IndexShardTestCase extends ESTestCase {
         failOnShardFailures.set(false);
     }
 
+    protected void failOnShardFailures() {
+        failOnShardFailures.set(true);
+    }
+
     public Settings threadPoolSettings() {
         return Settings.EMPTY;
     }
@@ -233,7 +237,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
             .settings(indexSettings)
             .primaryTerm(0, primaryTerm)
             .putMapping("_doc", "{ \"properties\": {} }");
-        return newShard(shardRouting, metaData.build(), engineFactory, listeners);
+        return newShard(shardRouting, metaData.build(), null, engineFactory, () -> {}, listeners);
     }
 
     /**
@@ -279,7 +283,6 @@ public abstract class IndexShardTestCase extends ESTestCase {
         return newShard(shardRouting, indexMetaData, searcherWrapper, new InternalEngineFactory(), globalCheckpointSyncer);
     }
 
-
     /**
      * creates a new initializing shard. The shard will will be put in its proper path under the
      * current node id the shard is assigned to.

+ 18 - 0
test/framework/src/main/java/org/elasticsearch/test/CorruptionUtils.java

@@ -21,6 +21,7 @@ package org.elasticsearch.test;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.store.ChecksumIndexInput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
@@ -47,6 +48,23 @@ public final class CorruptionUtils {
     private static Logger logger = ESLoggerFactory.getLogger("test");
     private CorruptionUtils() {}
 
+    public static void corruptIndex(Random random, Path indexPath, boolean corruptSegments) throws IOException {
+        // corrupt files
+        final Path[] filesToCorrupt =
+            Files.walk(indexPath)
+                .filter(p -> {
+                    final String name = p.getFileName().toString();
+                    boolean segmentFile = name.startsWith("segments_") || name.endsWith(".si");
+                        return Files.isRegularFile(p)
+                            && name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS
+                            && IndexWriter.WRITE_LOCK_NAME.equals(name) == false
+                            && (corruptSegments ? segmentFile : segmentFile == false);
+                    }
+                )
+                .toArray(Path[]::new);
+        corruptFile(random, filesToCorrupt);
+    }
+
     /**
      * Corrupts a random file at a random position
      */