Browse Source

Merge pull request #11536 from bleskes/recovery_translog_op_count_on_mapping_retry

Fix recovered translog ops stat counting when retrying a batch
Boaz Leskes 10 years ago
parent
commit
16d9480d78

+ 1 - 1
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1327,7 +1327,7 @@ public class IndexShard extends AbstractIndexShardComponent {
     }
 
     private final EngineConfig newEngineConfig(TranslogConfig translogConfig) {
-        final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
+        final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
             @Override
             protected void operationProcessed() {
                 assert recoveryState != null;

+ 26 - 4
core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java

@@ -55,8 +55,10 @@ public class TranslogRecoveryPerformer {
     private final IndexCache indexCache;
     private final MapperAnalyzer mapperAnalyzer;
     private final Map<String, Mapping> recoveredTypes = new HashMap<>();
+    private final ShardId shardId;
 
-    protected TranslogRecoveryPerformer(MapperService mapperService, MapperAnalyzer mapperAnalyzer, IndexQueryParserService queryParserService, IndexAliasesService indexAliasesService, IndexCache indexCache) {
+    protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, MapperAnalyzer mapperAnalyzer, IndexQueryParserService queryParserService, IndexAliasesService indexAliasesService, IndexCache indexCache) {
+        this.shardId = shardId;
         this.mapperService = mapperService;
         this.queryParserService = queryParserService;
         this.indexAliasesService = indexAliasesService;
@@ -76,13 +78,33 @@ public class TranslogRecoveryPerformer {
      */
     int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
         int numOps = 0;
-        for (Translog.Operation operation : operations) {
-            performRecoveryOperation(engine, operation, false);
-            numOps++;
+        try {
+            for (Translog.Operation operation : operations) {
+                performRecoveryOperation(engine, operation, false);
+                numOps++;
+            }
+        } catch (Throwable t) {
+            throw new BatchOperationException(shardId, "failed to apply batch translog operation [" + t.getMessage() + "]", numOps, t);
         }
         return numOps;
     }
 
+    public static class BatchOperationException extends IndexShardException {
+
+        private final int completedOperations;
+
+        public BatchOperationException(ShardId shardId, String msg, int completedOperations, Throwable cause) {
+            super(shardId, msg, cause);
+            this.completedOperations = completedOperations;
+        }
+
+
+        /** the number of succesful operations performed before the exception was thrown */
+        public int completedOperations() {
+            return completedOperations;
+        }
+    }
+
     private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
         if (update == null) {
             return;

+ 7 - 1
core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

@@ -26,7 +26,6 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
-import org.elasticsearch.common.logging.ESLoggerFactory;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -506,6 +505,13 @@ public class RecoveryState implements ToXContent, Streamable {
             assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
         }
 
+        public synchronized void decrementRecoveredOperations(int ops) {
+            recovered -= ops;
+            assert recovered >= 0 : "recovered operations must be non-negative. Because [" + recovered + "] after decrementing [" + ops + "]";
+            assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
+        }
+
+
         /**
          * returns the total number of translog operations recovered so far
          */

+ 7 - 6
core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -47,10 +47,7 @@ import org.elasticsearch.index.IndexShardMissingException;
 import org.elasticsearch.index.engine.RecoveryEngineException;
 import org.elasticsearch.index.mapper.MapperException;
 import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.shard.IllegalIndexShardStateException;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.IndexShardClosedException;
-import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.*;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.indices.IndexMissingException;
 import org.elasticsearch.indices.IndicesLifecycle;
@@ -308,10 +305,14 @@ public class RecoveryTarget extends AbstractComponent {
                 assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
                 try {
                     recoveryStatus.indexShard().performBatchRecovery(request.operations());
-                } catch (MapperException mapperException) {
+                } catch (TranslogRecoveryPerformer.BatchOperationException exception) {
+                    if (ExceptionsHelper.unwrapCause(exception) instanceof MapperException == false) {
+                        throw exception;
+                    }
                     // in very rare cases a translog replay from primary is processed before a mapping update on this node
                     // which causes local mapping changes. we want to wait until these mappings are processed.
-                    logger.trace("delaying recovery due to missing mapping changes", mapperException);
+                    logger.trace("delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)", exception, exception.completedOperations());
+                    translog.decrementRecoveredOperations(exception.completedOperations());
                     // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
                     // canceled)
                     observer.waitForNextChange(new ClusterStateObserver.Listener() {

+ 1 - 1
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -1820,7 +1820,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         public final AtomicInteger recoveredOps = new AtomicInteger(0);
 
         public TranslogHandler(String indexName) {
-            super(null, new MapperAnalyzer(null), null, null, null);
+            super(new ShardId("test", 0), null, new MapperAnalyzer(null), null, null, null);
             Settings settings = Settings.settingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
             RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder("test");
             Index index = new Index(indexName);

+ 4 - 0
core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java

@@ -389,6 +389,10 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
             for (int j = iterationOps; j > 0; j--) {
                 ops++;
                 translog.incrementRecoveredOperations();
+                if (randomBoolean()) {
+                    translog.decrementRecoveredOperations(1);
+                    translog.incrementRecoveredOperations();
+                }
             }
             assertThat(translog.recoveredOperations(), equalTo(ops));
             assertThat(translog.totalOperations(), equalTo(totalOps));