فهرست منبع

Merge remote-tracking branch 'dakrone/translog-cli'

Lee Hinman 9 سال پیش
والد
کامیت
e538c1c6d6

+ 1 - 1
core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java

@@ -61,7 +61,7 @@ class Checkpoint {
         Channels.writeToChannel(buffer, channel);
     }
 
-    private void write(DataOutput out) throws IOException {
+    void write(DataOutput out) throws IOException {
         out.writeLong(offset);
         out.writeInt(numOps);
         out.writeLong(generation);

+ 3 - 1
core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java

@@ -113,7 +113,9 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
                         headerStream.read(ref.bytes, ref.offset, ref.length);
                         BytesRef uuidBytes = new BytesRef(translogUUID);
                         if (uuidBytes.bytesEquals(ref) == false) {
-                            throw new TranslogCorruptedException("expected shard UUID [" + uuidBytes + "] but got: [" + ref + "] this translog file belongs to a different translog. path:" + path);
+                            throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + "/" + uuidBytes.utf8ToString() +
+                                            " but got: " + ref + "/" + ref.utf8ToString() +
+                                            " this translog file belongs to a different translog. path:" + path);
                         }
                         return new TranslogReader(checkpoint.generation, channel, path, ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES, checkpoint.offset, checkpoint.numOps);
                     default:

+ 56 - 0
core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java

@@ -0,0 +1,56 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import org.elasticsearch.cli.MultiCommand;
+import org.elasticsearch.cli.Terminal;
+import org.elasticsearch.common.logging.LogConfigurator;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.index.translog.TruncateTranslogCommand;
+import org.elasticsearch.node.internal.InternalSettingsPreparer;
+
+/**
+ * Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool
+ */
+public class TranslogToolCli extends MultiCommand {
+
+    public TranslogToolCli() {
+        super("A CLI tool for various Elasticsearch translog actions");
+        subcommands.put("truncate", new TruncateTranslogCommand());
+    }
+
+    public static void main(String[] args) throws Exception {
+        // initialize default for es.logger.level because we will not read the logging.yml
+        String loggerLevel = System.getProperty("es.logger.level", "INFO");
+        String pathHome = System.getProperty("es.path.home");
+        // Set the appender for all potential log files to terminal so that other components that use the logger print out the
+        // same terminal.
+        Environment loggingEnvironment = InternalSettingsPreparer.prepareEnvironment(Settings.builder()
+                .put("path.home", pathHome)
+                .put("appender.terminal.type", "terminal")
+                .put("rootLogger", "${logger.level}, terminal")
+                .put("logger.level", loggerLevel)
+                .build(), Terminal.DEFAULT);
+        LogConfigurator.configure(loggingEnvironment.settings(), false);
+
+        exit(new TranslogToolCli().main(args, Terminal.DEFAULT));
+    }
+}

+ 8 - 4
core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -76,10 +76,16 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
         return getHeaderLength(new BytesRef(translogUUID).length);
     }
 
-    private static int getHeaderLength(int uuidLength) {
+    static int getHeaderLength(int uuidLength) {
         return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + Integer.BYTES;
     }
 
+    static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOException {
+        CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION);
+        out.writeInt(ref.length);
+        out.writeBytes(ref.bytes, ref.offset, ref.length);
+    }
+
     public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
         final BytesRef ref = new BytesRef(translogUUID);
         final int headerLength = getHeaderLength(ref.length);
@@ -88,9 +94,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
             // This OutputStreamDataOutput is intentionally not closed because
             // closing it will close the FileChannel
             final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel));
-            CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION);
-            out.writeInt(ref.length);
-            out.writeBytes(ref.bytes, ref.offset, ref.length);
+            writeHeader(out, ref);
             channel.force(true);
             writeCheckpoint(channelFactory, headerLength, 0, file.getParent(), fileGeneration);
             final TranslogWriter writer = new TranslogWriter(channelFactory, shardId, fileGeneration, channel, file, bufferSize);

+ 224 - 0
core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java

@@ -0,0 +1,224 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.NativeFSLockFactory;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cli.SettingCommand;
+import org.elasticsearch.cli.Terminal;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.translog.Checkpoint;
+
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TruncateTranslogCommand extends SettingCommand {
+
+    private final OptionSpec<String> translogFolder;
+    private final OptionSpec<Void> batchMode;
+
+    public TruncateTranslogCommand() {
+        super("Truncates a translog to create a new, empty translog");
+        this.translogFolder = parser.acceptsAll(Arrays.asList("d", "dir"),
+                "Translog Directory location on disk")
+                .withRequiredArg()
+                .required();
+        this.batchMode = parser.acceptsAll(Arrays.asList("b", "batch"),
+                "Enable batch mode explicitly, automatic confirmation of warnings");
+    }
+
+    // Visible for testing
+    public OptionParser getParser() {
+        return this.parser;
+    }
+
+    @Override
+    protected void printAdditionalHelp(Terminal terminal) {
+        terminal.println("This tool truncates the translog and translog");
+        terminal.println("checkpoint files to create a new translog");
+    }
+
+    @SuppressForbidden(reason = "Necessary to use the path passed in")
+    private Path getTranslogPath(OptionSet options) {
+        return PathUtils.get(translogFolder.value(options), "", "");
+    }
+
+    @Override
+    protected void execute(Terminal terminal, OptionSet options, Map<String, String> settings) throws Exception {
+        boolean batch = options.has(batchMode);
+
+        Path translogPath = getTranslogPath(options);
+        Path idxLocation = translogPath.getParent().resolve("index");
+
+        if (Files.exists(translogPath) == false || Files.isDirectory(translogPath) == false) {
+            throw new ElasticsearchException("translog directory [" + translogPath + "], must exist and be a directory");
+        }
+
+        if (Files.exists(idxLocation) == false || Files.isDirectory(idxLocation) == false) {
+            throw new ElasticsearchException("unable to find a shard at [" + idxLocation + "], which must exist and be a directory");
+        }
+
+        // Hold the lock open for the duration of the tool running
+        try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
+                Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
+            Set<Path> translogFiles;
+            try {
+                terminal.println("Checking existing translog files");
+                translogFiles = filesInDirectory(translogPath);
+            } catch (IOException e) {
+                terminal.println("encountered IOException while listing directory, aborting...");
+                throw new ElasticsearchException("failed to find existing translog files", e);
+            }
+
+            // Warn about ES being stopped and files being deleted
+            warnAboutDeletingFiles(terminal, translogFiles, batch);
+
+            List<IndexCommit> commits;
+            try {
+                terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]");
+                commits = DirectoryReader.listCommits(dir);
+            } catch (IndexNotFoundException infe) {
+                throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe);
+            }
+
+            // Retrieve the generation and UUID from the existing data
+            Map<String, String> commitData = commits.get(commits.size() - 1).getUserData();
+            String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
+            String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
+            if (translogGeneration == null || translogUUID == null) {
+                throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
+                        translogGeneration, translogUUID);
+            }
+            terminal.println("Translog Generation: " + translogGeneration);
+            terminal.println("Translog UUID      : " + translogUUID);
+
+            Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
+            Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
+            Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
+                            translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
+            Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
+                            translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
+
+            // Write empty checkpoint and translog to empty files
+            long gen = Long.parseLong(translogGeneration);
+            int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
+            writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
+
+            terminal.println("Removing existing translog files");
+            IOUtils.rm(translogFiles.toArray(new Path[]{}));
+
+            terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]");
+            Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE);
+            terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]");
+            Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE);
+
+            // Fsync the translog directory after rename
+            IOUtils.fsync(translogPath, true);
+
+        } catch (LockObtainFailedException lofe) {
+            throw new ElasticsearchException("Failed to lock shard's directory at [" + idxLocation + "], is Elasticsearch still running?");
+        }
+
+        terminal.println("Done.");
+    }
+
+    /** Write a checkpoint file to the given location with the given generation */
+    public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
+        try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+                OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) {
+            Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration);
+            emptyCheckpoint.write(out);
+            fc.force(true);
+        }
+    }
+
+    /**
+     * Write a translog containing the given translog UUID to the given location. Returns the number of bytes written.
+     */
+    public static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException {
+        final BytesRef translogRef = new BytesRef(translogUUID);
+        try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+                OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) {
+            TranslogWriter.writeHeader(out, translogRef);
+            fc.force(true);
+        }
+        return TranslogWriter.getHeaderLength(translogRef.length);
+    }
+
+    /** Show a warning about deleting files, asking for a confirmation if {@code batchMode} is false */
+    public static void warnAboutDeletingFiles(Terminal terminal, Set<Path> files, boolean batchMode) {
+        terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
+        terminal.println("!   WARNING: Elasticsearch MUST be stopped before running this tool   !");
+        terminal.println("!                                                                     !");
+        terminal.println("!   WARNING:    Documents inside of translog files will be lost       !");
+        terminal.println("!                                                                     !");
+        terminal.println("!   WARNING:          The following files will be DELETED!            !");
+        terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
+        for (Path file : files) {
+            terminal.println("--> " + file);
+        }
+        terminal.println("");
+        if (batchMode == false) {
+            String text = terminal.readText("Continue and DELETE files? [y/N] ");
+            if (!text.equalsIgnoreCase("y")) {
+                throw new ElasticsearchException("aborted by user");
+            }
+        }
+    }
+
+    /** Return a Set of all files in a given directory */
+    public static Set<Path> filesInDirectory(Path directory) throws IOException {
+        Set<Path> files = new HashSet<>();
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
+            for (Path file : stream) {
+                files.add(file);
+            }
+        }
+        return files;
+    }
+
+}

+ 236 - 0
core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java

@@ -0,0 +1,236 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.cli.MockTerminal;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.routing.GroupShardsIterator;
+import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.MockEngineFactoryPlugin;
+import org.elasticsearch.index.translog.TruncateTranslogCommand;
+import org.elasticsearch.monitor.fs.FsInfo;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.engine.MockEngineSupport;
+import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
+import org.elasticsearch.test.transport.MockTransportService;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.notNullValue;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
+public class TruncateTranslogIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return pluginList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class);
+    }
+
+    public void testCorruptTranslogTruncation() throws Exception {
+        internalCluster().startNodesAsync(1, Settings.EMPTY).get();
+
+        assertAcked(prepareCreate("test").setSettings(Settings.builder()
+                        .put("index.number_of_shards", 1)
+                        .put("index.number_of_replicas", 0)
+                        .put("index.refresh_interval", "-1")
+                        .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
+                ));
+        ensureYellow();
+
+        // Index some documents
+        int numDocs = scaledRandomIntBetween(100, 1000);
+        IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
+        for (int i = 0; i < builders.length; i++) {
+            builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
+        }
+        disableTranslogFlush("test");
+        indexRandom(false, false, false, Arrays.asList(builders));
+        Set<Path> translogDirs = getTranslogDirs("test");
+
+        TruncateTranslogCommand ttc = new TruncateTranslogCommand();
+        MockTerminal t = new MockTerminal();
+        OptionParser parser = ttc.getParser();
+
+        for (Path translogDir : translogDirs) {
+            OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b");
+            // Try running it before the shard is closed, it should flip out because it can't acquire the lock
+            try {
+                logger.info("--> running truncate while index is open on [{}]", translogDir.toAbsolutePath());
+                ttc.execute(t, options, new HashMap<String, String>());
+                fail("expected the truncate command to fail not being able to acquire the lock");
+            } catch (Exception e) {
+                assertThat(e.getMessage(), containsString("Failed to lock shard's directory"));
+            }
+        }
+
+        // Corrupt the translog file(s)
+        logger.info("--> corrupting translog");
+        corruptRandomTranslogFiles("test");
+
+        // Restart the single node
+        logger.info("--> restarting node");
+        internalCluster().fullRestart();
+        client().admin().cluster().prepareHealth().setWaitForYellowStatus()
+                .setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS))
+                .setWaitForEvents(Priority.LANGUID)
+                .get();
+
+        try {
+            client().prepareSearch("test").setQuery(matchAllQuery()).get();
+            fail("all shards should be failed due to a corrupted translog");
+        } catch (SearchPhaseExecutionException e) {
+            // Good, all shards should be failed because there is only a
+            // single shard and its translog is corrupt
+        }
+
+        // Close the index so we can actually truncate the translog
+        logger.info("--> closing 'test' index");
+        client().admin().indices().prepareClose("test").get();
+
+        for (Path translogDir : translogDirs) {
+            OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b");
+            logger.info("--> running truncate translog command for [{}]", translogDir.toAbsolutePath());
+            ttc.execute(t, options, new HashMap<String, String>());
+            logger.info("--> output:\n{}", t.getOutput());
+        }
+
+        // Re-open index
+        logger.info("--> opening 'test' index");
+        client().admin().indices().prepareOpen("test").get();
+        ensureYellow("test");
+
+        // Run a search and make sure it succeeds
+        SearchResponse resp = client().prepareSearch("test").setQuery(matchAllQuery()).get();
+        ElasticsearchAssertions.assertNoFailures(resp);
+    }
+
+    private Set<Path> getTranslogDirs(String indexName) throws IOException {
+        ClusterState state = client().admin().cluster().prepareState().get().getState();
+        GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false);
+        final Index idx = state.metaData().index(indexName).getIndex();
+        List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
+        ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
+        ShardRouting shardRouting = shardIterator.nextOrNull();
+        assertNotNull(shardRouting);
+        assertTrue(shardRouting.primary());
+        assertTrue(shardRouting.assignedToNode());
+        String nodeId = shardRouting.currentNodeId();
+        NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
+        Set<Path> translogDirs = new TreeSet<>(); // treeset makes sure iteration order is deterministic
+        for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) {
+            String path = fsPath.getPath();
+            final String relativeDataLocationPath =  "indices/"+ idx.getUUID() +"/" + Integer.toString(shardRouting.getId()) + "/translog";
+            Path translogPath = PathUtils.get(path).resolve(relativeDataLocationPath);
+            if (Files.isDirectory(translogPath)) {
+                translogDirs.add(translogPath);
+            }
+        }
+        return translogDirs;
+    }
+
+    private void corruptRandomTranslogFiles(String indexName) throws IOException {
+        Set<Path> translogDirs = getTranslogDirs(indexName);
+        Set<Path> files = new TreeSet<>(); // treeset makes sure iteration order is deterministic
+        for (Path translogDir : translogDirs) {
+            if (Files.isDirectory(translogDir)) {
+                logger.info("--> path: {}", translogDir);
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
+                    for (Path item : stream) {
+                        logger.info("--> File: {}", item);
+                        if (Files.isRegularFile(item) && item.getFileName().toString().startsWith("translog-")) {
+                            files.add(item);
+                        }
+                    }
+                }
+            }
+        }
+        Path fileToCorrupt = null;
+        if (!files.isEmpty()) {
+            int corruptions = randomIntBetween(5, 20);
+            for (int i = 0; i < corruptions; i++) {
+                fileToCorrupt = RandomPicks.randomFrom(random(), files);
+                try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+                    // read
+                    raf.position(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.size() - 1)));
+                    long filePointer = raf.position();
+                    ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
+                    raf.read(bb);
+                    bb.flip();
+
+                    // corrupt
+                    byte oldValue = bb.get(0);
+                    byte newValue = (byte) (oldValue + 1);
+                    bb.put(0, newValue);
+
+                    // rewrite
+                    raf.position(filePointer);
+                    raf.write(bb);
+                    logger.info("--> corrupting file {} --  flipping at position {} from {} to {} file: {}",
+                            fileToCorrupt, filePointer, Integer.toHexString(oldValue),
+                            Integer.toHexString(newValue), fileToCorrupt);
+                }
+            }
+        }
+        assertThat("no file corrupted", fileToCorrupt, notNullValue());
+    }
+
+    /** Disables translog flushing for the specified index */
+    private static void disableTranslogFlush(String index) {
+        Settings settings = Settings.builder()
+                .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB))
+                .build();
+        client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
+    }
+
+}

+ 90 - 0
distribution/src/main/resources/bin/elasticsearch-translog

@@ -0,0 +1,90 @@
+#!/bin/bash
+
+CDPATH=""
+SCRIPT="$0"
+
+# SCRIPT may be an arbitrarily deep series of symlinks. Loop until we have the concrete path.
+while [ -h "$SCRIPT" ] ; do
+  ls=`ls -ld "$SCRIPT"`
+  # Drop everything prior to ->
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    SCRIPT="$link"
+  else
+    SCRIPT=`dirname "$SCRIPT"`/"$link"
+  fi
+done
+
+# determine elasticsearch home
+ES_HOME=`dirname "$SCRIPT"`/..
+
+# make ELASTICSEARCH_HOME absolute
+ES_HOME=`cd "$ES_HOME"; pwd`
+
+
+# Sets the default values for elasticsearch variables used in this script
+if [ -z "$CONF_DIR" ]; then
+  CONF_DIR="${path.conf}"
+fi
+
+# The default env file is defined at building/packaging time.
+# For a ${project.name} package, the value is "${path.env}".
+ES_ENV_FILE="${path.env}"
+
+# If an include is specified with the ES_INCLUDE environment variable, use it
+if [ -n "$ES_INCLUDE" ]; then
+    ES_ENV_FILE="$ES_INCLUDE"
+fi
+
+# Source the environment file
+if [ -n "$ES_ENV_FILE" ]; then
+
+  # If the ES_ENV_FILE is not found, try to resolve the path
+  # against the ES_HOME directory
+  if [ ! -f "$ES_ENV_FILE" ]; then
+      ES_ENV_FILE="$ELASTIC_HOME/$ES_ENV_FILE"
+  fi
+
+  . "$ES_ENV_FILE"
+  if [ $? -ne 0 ]; then
+      echo "Unable to source environment file: $ES_ENV_FILE" >&2
+      exit 1
+  fi
+fi
+
+# don't let JAVA_TOOL_OPTIONS slip in (e.g. crazy agents in ubuntu)
+# works around https://bugs.launchpad.net/ubuntu/+source/jayatana/+bug/1441487
+if [ "x$JAVA_TOOL_OPTIONS" != "x" ]; then
+    echo "Warning: Ignoring JAVA_TOOL_OPTIONS=$JAVA_TOOL_OPTIONS"
+    unset JAVA_TOOL_OPTIONS
+fi
+
+# CONF_FILE setting was removed
+if [ ! -z "$CONF_FILE" ]; then
+    echo "CONF_FILE setting is no longer supported. elasticsearch.yml must be placed in the config directory and cannot be renamed."
+    exit 1
+fi
+
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ ! -x "$JAVA" ]; then
+    echo "Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME"
+    exit 1
+fi
+
+# full hostname passed through cut for portability on systems that do not support hostname -s
+# export on separate line for shells that do not support combining definition and export
+HOSTNAME=`hostname | cut -d. -f1`
+export HOSTNAME
+
+declare -a args=("$@")
+
+if [ -e "$CONF_DIR" ]; then
+  args=("${args[@]}" -Edefault.path.conf="$CONF_DIR")
+fi
+
+exec "$JAVA" $ES_JAVA_OPTS -Delasticsearch -Des.path.home="$ES_HOME" -cp "$ES_HOME/lib/*" org.elasticsearch.index.translog.TranslogToolCli "${args[@]}"

+ 61 - 0
distribution/src/main/resources/bin/elasticsearch-translog.bat

@@ -0,0 +1,61 @@
+@echo off
+
+SETLOCAL enabledelayedexpansion
+
+IF DEFINED JAVA_HOME (
+  set JAVA=%JAVA_HOME%\bin\java.exe
+) ELSE (
+  FOR %%I IN (java.exe) DO set JAVA=%%~$PATH:I
+)
+IF NOT EXIST "%JAVA%" (
+  ECHO Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME 1>&2
+  EXIT /B 1
+)
+
+set SCRIPT_DIR=%~dp0
+for %%I in ("%SCRIPT_DIR%..") do set ES_HOME=%%~dpfI
+
+TITLE Elasticsearch Plugin Manager ${project.version}
+
+SET properties=
+SET args=
+
+:loop
+SET "current=%~1"
+SHIFT
+IF "x!current!" == "x" GOTO breakloop
+
+IF "!current:~0,2%!" == "-D" (
+    ECHO "!current!" | FINDSTR /C:"=">nul && (
+         :: current matches -D*=*
+         IF "x!properties!" NEQ "x" (
+             SET properties=!properties! "!current!"
+         ) ELSE (
+             SET properties="!current!"
+         )
+    ) || (
+         :: current matches -D*
+         IF "x!properties!" NEQ "x" (
+            SET properties=!properties! "!current!=%~1"
+         ) ELSE (
+            SET properties="!current!=%~1"
+         )
+         SHIFT
+    )
+) ELSE (
+    :: current matches *
+    IF "x!args!" NEQ "x" (
+        SET args=!args! "!current!"
+    ) ELSE (
+        SET args="!current!"
+    )
+)
+
+GOTO loop
+:breakloop
+
+SET HOSTNAME=%COMPUTERNAME%
+
+"%JAVA%" %ES_JAVA_OPTS% -Des.path.home="%ES_HOME%" !properties! -cp "%ES_HOME%/lib/*;" "org.elasticsearch.index.translog.TranslogToolCli" !args!
+
+ENDLOCAL