Browse Source

Recovery: add total operations to the `_recovery` API

This commit adds the current total number of translog operations to the recovery reporting API. We also expose the recovered / total percentage:

```
"translog": {
   "recovered": 536,
   "total": 986,
   "percent": "54.3%",
   "total_time": "2ms",
   "total_time_in_millis": 2
},
```

Closes #9368
Closes #10042
Boaz Leskes 10 years ago
parent
commit
b605184471
18 changed files with 178 additions and 45 deletions
  1. 3 0
      docs/reference/indices/recovery.asciidoc
  2. 3 0
      rest-api-spec/test/cat.recovery/10_basic.yaml
  3. 2 0
      rest-api-spec/test/indices.recovery/10_basic.yaml
  4. 3 1
      src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java
  5. 1 1
      src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java
  6. 2 0
      src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java
  7. 10 4
      src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java
  8. 11 1
      src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java
  9. 6 1
      src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java
  10. 9 1
      src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java
  11. 10 6
      src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
  12. 64 14
      src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
  13. 8 1
      src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
  14. 10 3
      src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java
  15. 6 0
      src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java
  16. 1 1
      src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java
  17. 1 3
      src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
  18. 28 8
      src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java

+ 3 - 0
docs/reference/indices/recovery.asciidoc

@@ -65,6 +65,9 @@ coming[1.5.0, this syntax was change to fix inconsistencies with other API]
       },
       },
       "translog" : {
       "translog" : {
         "recovered" : 0,
         "recovered" : 0,
+        "total" : 0,
+        "percent" : "100.0%",
+        "total_on_start" : 0,
         "total_time" : "0s",
         "total_time" : "0s",
         "total_time_in_millis" : 0
         "total_time_in_millis" : 0
       },
       },

+ 3 - 0
rest-api-spec/test/cat.recovery/10_basic.yaml

@@ -42,6 +42,9 @@
                 \d+\.\d+%   \s+                                 # bytes_percent
                 \d+\.\d+%   \s+                                 # bytes_percent
                 \d+         \s+                                 # total_files
                 \d+         \s+                                 # total_files
                 \d+         \s+                                 # total_bytes
                 \d+         \s+                                 # total_bytes
+                \d+         \s+                                 # translog
+                -?\d+\.\d+% \s+                                 # translog_percent
+                -?\d+       \s+                                 # total_translog
                 \n
                 \n
               )+
               )+
               $/
               $/

+ 2 - 0
rest-api-spec/test/indices.recovery/10_basic.yaml

@@ -33,6 +33,8 @@
   - gte:   { test_1.shards.0.index.size.recovered_in_bytes:     0                       }
   - gte:   { test_1.shards.0.index.size.recovered_in_bytes:     0                       }
   - match: { test_1.shards.0.index.size.percent:                /^\d+\.\d\%$/           }
   - match: { test_1.shards.0.index.size.percent:                /^\d+\.\d\%$/           }
   - gte:   { test_1.shards.0.translog.recovered:                0                       }
   - gte:   { test_1.shards.0.translog.recovered:                0                       }
+  - gte:   { test_1.shards.0.translog.total:                    -1                      }
+  - gte:   { test_1.shards.0.translog.total_on_start:           0                       }
   - gte:   { test_1.shards.0.translog.total_time_in_millis:     0                       }
   - gte:   { test_1.shards.0.translog.total_time_in_millis:     0                       }
   - gte:   { test_1.shards.0.start.check_index_time_in_millis:  0                       }
   - gte:   { test_1.shards.0.start.check_index_time_in_millis:  0                       }
   - gte:   { test_1.shards.0.start.total_time_in_millis:        0                       }
   - gte:   { test_1.shards.0.start.total_time_in_millis:        0                       }

+ 3 - 1
src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java

@@ -191,6 +191,8 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
 
 
             if (recoveringTranslogFile == null || Files.exists(recoveringTranslogFile) == false) {
             if (recoveringTranslogFile == null || Files.exists(recoveringTranslogFile) == false) {
                 // no translog files, bail
                 // no translog files, bail
+                recoveryState.getTranslog().totalOperations(0);
+                recoveryState.getTranslog().totalOperationsOnStart(0);
                 indexShard.finalizeRecovery();
                 indexShard.finalizeRecovery();
                 indexShard.postRecovery("post recovery from gateway, no translog");
                 indexShard.postRecovery("post recovery from gateway, no translog");
                 // no index, just start the shard and bail
                 // no index, just start the shard and bail
@@ -236,7 +238,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
                                 typesToUpdate.add(potentialIndexOperation.docMapper().type());
                                 typesToUpdate.add(potentialIndexOperation.docMapper().type());
                             }
                             }
                         }
                         }
-                        recoveryState.getTranslog().addTranslogOperations(1);
+                        recoveryState.getTranslog().incrementRecoveredOperations();
                     } catch (ElasticsearchException e) {
                     } catch (ElasticsearchException e) {
                         if (e.status() == RestStatus.BAD_REQUEST) {
                         if (e.status() == RestStatus.BAD_REQUEST) {
                             // mainly for MapperParsingException and Failure to detect xcontent
                             // mainly for MapperParsingException and Failure to detect xcontent

+ 1 - 1
src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java

@@ -132,7 +132,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
                                 .append(new ByteSizeValue(index.reusedBytes())).append("]\n");
                                 .append(new ByteSizeValue(index.reusedBytes())).append("]\n");
                         sb.append("    start    : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [")
                         sb.append("    start    : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [")
                                 .append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n");
                                 .append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n");
-                        sb.append("    translog : number_of_operations [").append(recoveryState.getTranslog().currentTranslogOperations())
+                        sb.append("    translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations())
                                 .append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
                                 .append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
                         logger.trace(sb.toString());
                         logger.trace(sb.toString());
                     } else if (logger.isDebugEnabled()) {
                     } else if (logger.isDebugEnabled()) {

+ 2 - 0
src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java

@@ -115,6 +115,8 @@ public class IndexShardSnapshotAndRestoreService extends AbstractIndexShardCompo
             logger.trace("[{}] restoring shard  [{}]", restoreSource.snapshotId(), shardId);
             logger.trace("[{}] restoring shard  [{}]", restoreSource.snapshotId(), shardId);
         }
         }
         try {
         try {
+            recoveryState.getTranslog().totalOperations(0);
+            recoveryState.getTranslog().totalOperationsOnStart(0);
             indexShard.prepareForIndexRecovery();
             indexShard.prepareForIndexRecovery();
             IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
             IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
             ShardId snapshotShardId = shardId;
             ShardId snapshotShardId = shardId;

+ 10 - 4
src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java

@@ -19,7 +19,6 @@
 
 
 package org.elasticsearch.indices.recovery;
 package org.elasticsearch.indices.recovery;
 
 
-import com.google.common.collect.Sets;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardId;
@@ -27,7 +26,6 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequest;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.util.Set;
 
 
 /**
 /**
  *
  *
@@ -37,15 +35,17 @@ class RecoveryCleanFilesRequest extends TransportRequest {
     private long recoveryId;
     private long recoveryId;
     private ShardId shardId;
     private ShardId shardId;
 
 
-    private Store.MetadataSnapshot  snapshotFiles;
+    private Store.MetadataSnapshot snapshotFiles;
+    private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
 
 
     RecoveryCleanFilesRequest() {
     RecoveryCleanFilesRequest() {
     }
     }
 
 
-    RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles) {
+    RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles, int totalTranslogOps) {
         this.recoveryId = recoveryId;
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.shardId = shardId;
         this.snapshotFiles = snapshotFiles;
         this.snapshotFiles = snapshotFiles;
+        this.totalTranslogOps = totalTranslogOps;
     }
     }
 
 
     public long recoveryId() {
     public long recoveryId() {
@@ -62,6 +62,7 @@ class RecoveryCleanFilesRequest extends TransportRequest {
         recoveryId = in.readLong();
         recoveryId = in.readLong();
         shardId = ShardId.readShardId(in);
         shardId = ShardId.readShardId(in);
         snapshotFiles = Store.MetadataSnapshot.read(in);
         snapshotFiles = Store.MetadataSnapshot.read(in);
+        totalTranslogOps = in.readVInt();
     }
     }
 
 
     @Override
     @Override
@@ -70,9 +71,14 @@ class RecoveryCleanFilesRequest extends TransportRequest {
         out.writeLong(recoveryId);
         out.writeLong(recoveryId);
         shardId.writeTo(out);
         shardId.writeTo(out);
         snapshotFiles.writeTo(out);
         snapshotFiles.writeTo(out);
+        out.writeVInt(totalTranslogOps);
     }
     }
 
 
     public Store.MetadataSnapshot sourceMetaSnapshot() {
     public Store.MetadataSnapshot sourceMetaSnapshot() {
         return snapshotFiles;
         return snapshotFiles;
     }
     }
+
+    public int totalTranslogOps() {
+        return totalTranslogOps;
+    }
 }
 }

+ 11 - 1
src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java

@@ -42,16 +42,20 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
     private BytesReference content;
     private BytesReference content;
     private StoreFileMetaData metaData;
     private StoreFileMetaData metaData;
 
 
+    private int totalTranslogOps;
+
     RecoveryFileChunkRequest() {
     RecoveryFileChunkRequest() {
     }
     }
 
 
-    public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content, boolean lastChunk) {
+    public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content,
+                                    boolean lastChunk, int totalTranslogOps) {
         this.recoveryId = recoveryId;
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.shardId = shardId;
         this.metaData = metaData;
         this.metaData = metaData;
         this.position = position;
         this.position = position;
         this.content = content;
         this.content = content;
         this.lastChunk = lastChunk;
         this.lastChunk = lastChunk;
+        this.totalTranslogOps = totalTranslogOps;
     }
     }
 
 
     public long recoveryId() {
     public long recoveryId() {
@@ -83,6 +87,10 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
         return content;
         return content;
     }
     }
 
 
+    public int totalTranslogOps() {
+        return totalTranslogOps;
+    }
+
     @Override
     @Override
     public void readFrom(StreamInput in) throws IOException {
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
         super.readFrom(in);
@@ -98,6 +106,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
         writtenBy = Lucene.parseVersionLenient(versionString, null);
         writtenBy = Lucene.parseVersionLenient(versionString, null);
         metaData = new StoreFileMetaData(name, length, checksum, writtenBy);
         metaData = new StoreFileMetaData(name, length, checksum, writtenBy);
         lastChunk = in.readBoolean();
         lastChunk = in.readBoolean();
+        totalTranslogOps = in.readVInt();
     }
     }
 
 
     @Override
     @Override
@@ -112,6 +121,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
         out.writeBytesReference(content);
         out.writeBytesReference(content);
         out.writeOptionalString(metaData.writtenBy() == null ? null : metaData.writtenBy().toString());
         out.writeOptionalString(metaData.writtenBy() == null ? null : metaData.writtenBy().toString());
         out.writeBoolean(lastChunk);
         out.writeBoolean(lastChunk);
+        out.writeVInt(totalTranslogOps);
     }
     }
 
 
     @Override
     @Override

+ 6 - 1
src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java

@@ -41,17 +41,20 @@ class RecoveryFilesInfoRequest extends TransportRequest {
     List<String> phase1ExistingFileNames;
     List<String> phase1ExistingFileNames;
     List<Long> phase1ExistingFileSizes;
     List<Long> phase1ExistingFileSizes;
 
 
+    int totalTranslogOps;
+
     RecoveryFilesInfoRequest() {
     RecoveryFilesInfoRequest() {
     }
     }
 
 
     RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes,
     RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes,
-                             List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes) {
+                             List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
         this.recoveryId = recoveryId;
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.shardId = shardId;
         this.phase1FileNames = phase1FileNames;
         this.phase1FileNames = phase1FileNames;
         this.phase1FileSizes = phase1FileSizes;
         this.phase1FileSizes = phase1FileSizes;
         this.phase1ExistingFileNames = phase1ExistingFileNames;
         this.phase1ExistingFileNames = phase1ExistingFileNames;
         this.phase1ExistingFileSizes = phase1ExistingFileSizes;
         this.phase1ExistingFileSizes = phase1ExistingFileSizes;
+        this.totalTranslogOps = totalTranslogOps;
     }
     }
 
 
     public long recoveryId() {
     public long recoveryId() {
@@ -90,6 +93,7 @@ class RecoveryFilesInfoRequest extends TransportRequest {
         for (int i = 0; i < size; i++) {
         for (int i = 0; i < size; i++) {
             phase1ExistingFileSizes.add(in.readVLong());
             phase1ExistingFileSizes.add(in.readVLong());
         }
         }
+        totalTranslogOps = in.readVInt();
     }
     }
 
 
     @Override
     @Override
@@ -117,5 +121,6 @@ class RecoveryFilesInfoRequest extends TransportRequest {
         for (Long phase1ExistingFileSize : phase1ExistingFileSizes) {
         for (Long phase1ExistingFileSize : phase1ExistingFileSizes) {
             out.writeVLong(phase1ExistingFileSize);
             out.writeVLong(phase1ExistingFileSize);
         }
         }
+        out.writeVInt(totalTranslogOps);
     }
     }
 }
 }

+ 9 - 1
src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java

@@ -33,13 +33,15 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
 
 
     private long recoveryId;
     private long recoveryId;
     private ShardId shardId;
     private ShardId shardId;
+    private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
 
 
     RecoveryPrepareForTranslogOperationsRequest() {
     RecoveryPrepareForTranslogOperationsRequest() {
     }
     }
 
 
-    RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId) {
+    RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
         this.recoveryId = recoveryId;
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.shardId = shardId;
+        this.totalTranslogOps = totalTranslogOps;
     }
     }
 
 
     public long recoveryId() {
     public long recoveryId() {
@@ -50,11 +52,16 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
         return shardId;
         return shardId;
     }
     }
 
 
+    public int totalTranslogOps() {
+        return totalTranslogOps;
+    }
+
     @Override
     @Override
     public void readFrom(StreamInput in) throws IOException {
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
         super.readFrom(in);
         recoveryId = in.readLong();
         recoveryId = in.readLong();
         shardId = ShardId.readShardId(in);
         shardId = ShardId.readShardId(in);
+        totalTranslogOps = in.readVInt();
     }
     }
 
 
     @Override
     @Override
@@ -62,5 +69,6 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
         super.writeTo(out);
         super.writeTo(out);
         out.writeLong(recoveryId);
         out.writeLong(recoveryId);
         shardId.writeTo(out);
         shardId.writeTo(out);
+        out.writeVInt(totalTranslogOps);
     }
     }
 }
 }

+ 10 - 6
src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -199,7 +199,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
                 @Override
                 @Override
                 public void run() throws InterruptedException {
                 public void run() throws InterruptedException {
                     RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
                     RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
-                            response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes);
+                            response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
+                            shard.translog().estimatedNumberOfOperations());
                     transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
                     transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
                             TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
                             TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
                             EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                             EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
@@ -288,7 +289,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
                                     public void run() throws InterruptedException {
                                     public void run() throws InterruptedException {
                                         // Actually send the file chunk to the target node, waiting for it to complete
                                         // Actually send the file chunk to the target node, waiting for it to complete
                                         transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
                                         transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
-                                                new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk),
+                                                new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content,
+                                                        lastChunk, shard.translog().estimatedNumberOfOperations()),
                                                 requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                                                 requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                                     }
                                     }
                                 });
                                 });
@@ -350,7 +352,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
                     // are deleted
                     // are deleted
                     try {
                     try {
                         transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
                         transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
-                                new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
+                                new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, shard.translog().estimatedNumberOfOperations()),
                                 TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
                                 TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
                                 EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                                 EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                     } catch (RemoteTransportException remoteException) {
                     } catch (RemoteTransportException remoteException) {
@@ -427,7 +429,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
                 // operations. This ensures the shard engine is started and disables
                 // operations. This ensures the shard engine is started and disables
                 // garbage collection (not the JVM's GC!) of tombstone deletes
                 // garbage collection (not the JVM's GC!) of tombstone deletes
                 transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
                 transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
-                        new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()),
+                        new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), shard.translog().estimatedNumberOfOperations()),
                         TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                         TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
             }
             }
         });
         });
@@ -616,7 +618,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
                 cancellableThreads.execute(new Interruptable() {
                 cancellableThreads.execute(new Interruptable() {
                     @Override
                     @Override
                     public void run() throws InterruptedException {
                     public void run() throws InterruptedException {
-                        final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
+                        final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
+                                request.recoveryId(), request.shardId(), operations, shard.translog().estimatedNumberOfOperations());
                         transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
                         transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
                                 recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                                 recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                     }
                     }
@@ -633,7 +636,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
             cancellableThreads.execute(new Interruptable() {
             cancellableThreads.execute(new Interruptable() {
                 @Override
                 @Override
                 public void run() throws InterruptedException {
                 public void run() throws InterruptedException {
-                    RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
+                    RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
+                            request.recoveryId(), request.shardId(), operations, shard.translog().estimatedNumberOfOperations());
                     transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
                     transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
                             recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                             recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                 }
                 }

+ 64 - 14
src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

@@ -39,7 +39,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.List;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
 /**
  * Keeps track of state related to shard recovery.
  * Keeps track of state related to shard recovery.
@@ -359,6 +358,7 @@ public class RecoveryState implements ToXContent, Streamable {
         static final XContentBuilderString TARGET = new XContentBuilderString("target");
         static final XContentBuilderString TARGET = new XContentBuilderString("target");
         static final XContentBuilderString INDEX = new XContentBuilderString("index");
         static final XContentBuilderString INDEX = new XContentBuilderString("index");
         static final XContentBuilderString TRANSLOG = new XContentBuilderString("translog");
         static final XContentBuilderString TRANSLOG = new XContentBuilderString("translog");
+        static final XContentBuilderString TOTAL_ON_START = new XContentBuilderString("total_on_start");
         static final XContentBuilderString START = new XContentBuilderString("start");
         static final XContentBuilderString START = new XContentBuilderString("start");
         static final XContentBuilderString RECOVERED = new XContentBuilderString("recovered");
         static final XContentBuilderString RECOVERED = new XContentBuilderString("recovered");
         static final XContentBuilderString RECOVERED_IN_BYTES = new XContentBuilderString("recovered_in_bytes");
         static final XContentBuilderString RECOVERED_IN_BYTES = new XContentBuilderString("recovered_in_bytes");
@@ -473,40 +473,90 @@ public class RecoveryState implements ToXContent, Streamable {
     }
     }
 
 
     public static class Translog extends Timer implements ToXContent, Streamable {
     public static class Translog extends Timer implements ToXContent, Streamable {
-        private final AtomicInteger currentTranslogOperations = new AtomicInteger();
+        public static final int UNKNOWN = -1;
 
 
-        public void reset() {
+        private int recovered;
+        private int total = UNKNOWN;
+        private int totalOnStart = UNKNOWN;
+
+        public synchronized void reset() {
             super.reset();
             super.reset();
-            currentTranslogOperations.set(0);
+            recovered = 0;
+            total = UNKNOWN;
+            totalOnStart = UNKNOWN;
+        }
+
+        public synchronized void incrementRecoveredOperations() {
+            recovered++;
+            assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
+        }
+
+        /** returns the total number of translog operations recovered so far */
+        public synchronized int recoveredOperations() {
+            return recovered;
         }
         }
 
 
-        public void addTranslogOperations(int count) {
-            this.currentTranslogOperations.addAndGet(count);
+        /**
+         * returns the total number of translog operations needed to be recovered at this moment.
+         * Note that this can change as the number of operations grows during recovery.
+         * <p/>
+         * A value of -1 ({@link RecoveryState.Translog#UNKNOWN} is return if this is unknown (typically a gateway recovery)
+         */
+        public synchronized int totalOperations() {
+            return total;
+        }
+
+        public synchronized void totalOperations(int total) {
+            this.total = total;
+            assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
+        }
+
+        /**
+         * returns the total number of translog operations to recovered, on the start of the recovery. Unlike {@link #totalOperations}
+         * this does change during recovery.
+         * <p/>
+         * A value of -1 ({@link RecoveryState.Translog#UNKNOWN} is return if this is unknown (typically a gateway recovery)
+         */
+        public synchronized int totalOperationsOnStart() {
+            return this.totalOnStart;
         }
         }
 
 
-        public void incrementTranslogOperations() {
-            this.currentTranslogOperations.incrementAndGet();
+        public synchronized void totalOperationsOnStart(int total) {
+            this.totalOnStart = total;
         }
         }
 
 
-        public int currentTranslogOperations() {
-            return this.currentTranslogOperations.get();
+        public synchronized float recoveredPercent() {
+            if (total == UNKNOWN) {
+                return -1.f;
+            }
+            if (total == 0) {
+                return 100.f;
+            }
+            return recovered * 100.0f / total;
         }
         }
 
 
         @Override
         @Override
         public void readFrom(StreamInput in) throws IOException {
         public void readFrom(StreamInput in) throws IOException {
             super.readFrom(in);
             super.readFrom(in);
-            currentTranslogOperations.set(in.readVInt());
+            recovered = in.readVInt();
+            total = in.readVInt();
+            totalOnStart = in.readVInt();
         }
         }
 
 
         @Override
         @Override
         public void writeTo(StreamOutput out) throws IOException {
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             super.writeTo(out);
-            out.writeVInt(currentTranslogOperations.get());
+            out.writeVInt(recovered);
+            out.writeVInt(total);
+            out.writeVInt(totalOnStart);
         }
         }
 
 
         @Override
         @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.field(Fields.RECOVERED, currentTranslogOperations.get());
+        public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field(Fields.RECOVERED, recovered);
+            builder.field(Fields.TOTAL, total);
+            builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredPercent()));
+            builder.field(Fields.TOTAL_ON_START, totalOnStart);
             builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time());
             builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time());
             return builder;
             return builder;
         }
         }

+ 8 - 1
src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -277,6 +277,7 @@ public class RecoveryTarget extends AbstractComponent {
         public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
         public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
                 final RecoveryStatus recoveryStatus = statusRef.status();
                 final RecoveryStatus recoveryStatus = statusRef.status();
+                recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
                 recoveryStatus.indexShard().prepareForTranslogRecovery();
                 recoveryStatus.indexShard().prepareForTranslogRecovery();
             }
             }
             channel.sendResponse(TransportResponse.Empty.INSTANCE);
             channel.sendResponse(TransportResponse.Empty.INSTANCE);
@@ -322,9 +323,11 @@ public class RecoveryTarget extends AbstractComponent {
         public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
         public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
                 final RecoveryStatus recoveryStatus = statusRef.status();
                 final RecoveryStatus recoveryStatus = statusRef.status();
+                final RecoveryState.Translog translog = recoveryStatus.state().getTranslog();
+                translog.totalOperations(request.totalTranslogOps());
                 for (Translog.Operation operation : request.operations()) {
                 for (Translog.Operation operation : request.operations()) {
                     recoveryStatus.indexShard().performRecoveryOperation(operation);
                     recoveryStatus.indexShard().performRecoveryOperation(operation);
-                    recoveryStatus.state().getTranslog().incrementTranslogOperations();
+                    translog.incrementRecoveredOperations();
                 }
                 }
             }
             }
             channel.sendResponse(TransportResponse.Empty.INSTANCE);
             channel.sendResponse(TransportResponse.Empty.INSTANCE);
@@ -355,6 +358,8 @@ public class RecoveryTarget extends AbstractComponent {
                 for (int i = 0; i < request.phase1FileNames.size(); i++) {
                 for (int i = 0; i < request.phase1FileNames.size(); i++) {
                     index.addFileDetail(request.phase1FileNames.get(i), request.phase1FileSizes.get(i), false);
                     index.addFileDetail(request.phase1FileNames.get(i), request.phase1FileSizes.get(i), false);
                 }
                 }
+                recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps);
+                recoveryStatus.state().getTranslog().totalOperationsOnStart(request.totalTranslogOps);
                 // recoveryBytesCount / recoveryFileCount will be set as we go...
                 // recoveryBytesCount / recoveryFileCount will be set as we go...
                 channel.sendResponse(TransportResponse.Empty.INSTANCE);
                 channel.sendResponse(TransportResponse.Empty.INSTANCE);
             }
             }
@@ -377,6 +382,7 @@ public class RecoveryTarget extends AbstractComponent {
         public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
         public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
                 final RecoveryStatus recoveryStatus = statusRef.status();
                 final RecoveryStatus recoveryStatus = statusRef.status();
+                recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
                 // first, we go and move files that were created with the recovery id suffix to
                 // first, we go and move files that were created with the recovery id suffix to
                 // the actual names, its ok if we have a corrupted index here, since we have replicas
                 // the actual names, its ok if we have a corrupted index here, since we have replicas
                 // to recover from in case of a full cluster shutdown just when this code executes...
                 // to recover from in case of a full cluster shutdown just when this code executes...
@@ -425,6 +431,7 @@ public class RecoveryTarget extends AbstractComponent {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
             try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
                 final RecoveryStatus recoveryStatus = statusRef.status();
                 final RecoveryStatus recoveryStatus = statusRef.status();
                 final Store store = recoveryStatus.store();
                 final Store store = recoveryStatus.store();
+                recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
                 IndexOutput indexOutput;
                 IndexOutput indexOutput;
                 if (request.position() == 0) {
                 if (request.position() == 0) {
                     indexOutput = recoveryStatus.openAndPutIndexOutput(request.name(), request.metadata(), store);
                     indexOutput = recoveryStatus.openAndPutIndexOutput(request.name(), request.metadata(), store);

+ 10 - 3
src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java

@@ -20,12 +20,11 @@
 package org.elasticsearch.indices.recovery;
 package org.elasticsearch.indices.recovery;
 
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Lists;
-import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.translog.TranslogStreams;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.TranslogStreams;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequest;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -39,14 +38,16 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
     private long recoveryId;
     private long recoveryId;
     private ShardId shardId;
     private ShardId shardId;
     private List<Translog.Operation> operations;
     private List<Translog.Operation> operations;
+    private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
 
 
     RecoveryTranslogOperationsRequest() {
     RecoveryTranslogOperationsRequest() {
     }
     }
 
 
-    RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations) {
+    RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps) {
         this.recoveryId = recoveryId;
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.shardId = shardId;
         this.operations = operations;
         this.operations = operations;
+        this.totalTranslogOps = totalTranslogOps;
     }
     }
 
 
     public long recoveryId() {
     public long recoveryId() {
@@ -61,6 +62,10 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
         return operations;
         return operations;
     }
     }
 
 
+    public int totalTranslogOps() {
+        return totalTranslogOps;
+    }
+
     @Override
     @Override
     public void readFrom(StreamInput in) throws IOException {
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
         super.readFrom(in);
@@ -71,6 +76,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
         for (int i = 0; i < size; i++) {
         for (int i = 0; i < size; i++) {
             operations.add(TranslogStreams.CHECKSUMMED_TRANSLOG_STREAM.read(in));
             operations.add(TranslogStreams.CHECKSUMMED_TRANSLOG_STREAM.read(in));
         }
         }
+        totalTranslogOps = in.readVInt();
     }
     }
 
 
     @Override
     @Override
@@ -82,5 +88,6 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
         for (Translog.Operation operation : operations) {
         for (Translog.Operation operation : operations) {
             TranslogStreams.CHECKSUMMED_TRANSLOG_STREAM.write(out, operation);
             TranslogStreams.CHECKSUMMED_TRANSLOG_STREAM.write(out, operation);
         }
         }
+        out.writeVInt(totalTranslogOps);
     }
     }
 }
 }

+ 6 - 0
src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java

@@ -98,6 +98,9 @@ public class RestRecoveryAction extends AbstractCatAction {
                 .addCell("bytes_percent", "alias:bp;desc:percent of bytes recovered")
                 .addCell("bytes_percent", "alias:bp;desc:percent of bytes recovered")
                 .addCell("total_files", "alias:tf;desc:total number of files")
                 .addCell("total_files", "alias:tf;desc:total number of files")
                 .addCell("total_bytes", "alias:tb;desc:total number of bytes")
                 .addCell("total_bytes", "alias:tb;desc:total number of bytes")
+                .addCell("translog", "alias:tr;desc:translog operations recovered")
+                .addCell("translog_percent", "alias:trp;desc:percent of translog recovery")
+                .addCell("total_translog", "alias:trt;desc:current total translog operations")
                 .endHeaders();
                 .endHeaders();
         return t;
         return t;
     }
     }
@@ -156,6 +159,9 @@ public class RestRecoveryAction extends AbstractCatAction {
                 t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent()));
                 t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent()));
                 t.addCell(state.getIndex().totalFileCount());
                 t.addCell(state.getIndex().totalFileCount());
                 t.addCell(state.getIndex().totalBytes());
                 t.addCell(state.getIndex().totalBytes());
+                t.addCell(state.getTranslog().recoveredOperations());
+                t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getTranslog().recoveredPercent()));
+                t.addCell(state.getTranslog().totalOperations());
                 t.endRow();
                 t.endRow();
             }
             }
         }
         }

+ 1 - 1
src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java

@@ -132,7 +132,7 @@ public class ReplicaRecoveryBenchmark {
                     long translogOps;
                     long translogOps;
                     long bytes;
                     long bytes;
                     if (indexRecoveries.size() > 0) {
                     if (indexRecoveries.size() > 0) {
-                        translogOps = indexRecoveries.get(0).recoveryState().getTranslog().currentTranslogOperations();
+                        translogOps = indexRecoveries.get(0).recoveryState().getTranslog().recoveredOperations();
                         bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredBytes();
                         bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredBytes();
                     } else {
                     } else {
                         bytes = lastBytes = 0;
                         bytes = lastBytes = 0;

+ 1 - 3
src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java

@@ -20,7 +20,6 @@ package org.elasticsearch.index.store;
 
 
 import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
 import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
 import com.carrotsearch.randomizedtesting.LifecycleScope;
 import com.carrotsearch.randomizedtesting.LifecycleScope;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import com.google.common.base.Charsets;
 import com.google.common.base.Charsets;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicate;
@@ -51,7 +50,6 @@ import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.Discovery;
-import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.merge.policy.MergePolicyModule;
 import org.elasticsearch.index.merge.policy.MergePolicyModule;
 import org.elasticsearch.index.settings.IndexSettings;
 import org.elasticsearch.index.settings.IndexSettings;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShard;
@@ -406,7 +404,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
                         RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
                         RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
                         if (truncate && req.length() > 1) {
                         if (truncate && req.length() > 1) {
                             BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int) req.length() - 1);
                             BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int) req.length() - 1);
-                            request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk());
+                            request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk(), req.totalTranslogOps());
                         } else {
                         } else {
                             byte[] array = req.content().array();
                             byte[] array = req.content().array();
                             int i = randomIntBetween(0, req.content().length() - 1);
                             int i = randomIntBetween(0, req.content().length() - 1);

+ 28 - 8
src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java

@@ -343,19 +343,33 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
 
 
         // we don't need to test the time aspect, it's done in the timer test
         // we don't need to test the time aspect, it's done in the timer test
         translog.start();
         translog.start();
-        assertThat(translog.currentTranslogOperations(), equalTo(0));
+        assertThat(translog.recoveredOperations(), equalTo(0));
+        assertThat(translog.totalOperations(), equalTo(Translog.UNKNOWN));
+        assertThat(translog.totalOperationsOnStart(), equalTo(Translog.UNKNOWN));
         streamer.start();
         streamer.start();
         // force one
         // force one
         streamer.serializeDeserialize();
         streamer.serializeDeserialize();
         int ops = 0;
         int ops = 0;
+        int totalOps = 0;
+        int totalOpsOnStart = randomIntBetween(10, 200);
+        translog.totalOperationsOnStart(totalOpsOnStart);
         for (int i = scaledRandomIntBetween(10, 200); i > 0; i--) {
         for (int i = scaledRandomIntBetween(10, 200); i > 0; i--) {
-            for (int j = randomIntBetween(1, 10); j > 0; j--) {
+            final int iterationOps = randomIntBetween(1, 10);
+            totalOps += iterationOps;
+            translog.totalOperations(totalOps);
+            assertThat((double) translog.recoveredPercent(), closeTo(100.0 * ops / totalOps, 0.1));
+            for (int j = iterationOps; j > 0; j--) {
                 ops++;
                 ops++;
-                translog.incrementTranslogOperations();
+                translog.incrementRecoveredOperations();
             }
             }
-            assertThat(translog.currentTranslogOperations(), equalTo(ops));
-            assertThat(streamer.lastRead().currentTranslogOperations(), greaterThanOrEqualTo(0));
-            assertThat(streamer.lastRead().currentTranslogOperations(), lessThanOrEqualTo(ops));
+            assertThat(translog.recoveredOperations(), equalTo(ops));
+            assertThat(translog.totalOperations(), equalTo(totalOps));
+            assertThat(translog.recoveredPercent(), equalTo(100.f));
+            assertThat(streamer.lastRead().recoveredOperations(), greaterThanOrEqualTo(0));
+            assertThat(streamer.lastRead().recoveredOperations(), lessThanOrEqualTo(ops));
+            assertThat(streamer.lastRead().totalOperations(), lessThanOrEqualTo(totalOps));
+            assertThat(streamer.lastRead().totalOperationsOnStart(), lessThanOrEqualTo(totalOpsOnStart));
+            assertThat(streamer.lastRead().recoveredPercent(), either(greaterThanOrEqualTo(0.f)).or(equalTo(-1.f)));
         }
         }
 
 
         boolean stopped = false;
         boolean stopped = false;
@@ -367,13 +381,19 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
         if (randomBoolean()) {
         if (randomBoolean()) {
             translog.reset();
             translog.reset();
             ops = 0;
             ops = 0;
-            assertThat(translog.currentTranslogOperations(), equalTo(0));
+            totalOps = Translog.UNKNOWN;
+            totalOpsOnStart = Translog.UNKNOWN;
+            assertThat(translog.recoveredOperations(), equalTo(0));
+            assertThat(translog.totalOperationsOnStart(), equalTo(Translog.UNKNOWN));
+            assertThat(translog.totalOperations(), equalTo(Translog.UNKNOWN));
         }
         }
 
 
         stop.set(true);
         stop.set(true);
         streamer.join();
         streamer.join();
         final Translog lastRead = streamer.lastRead();
         final Translog lastRead = streamer.lastRead();
-        assertThat(lastRead.currentTranslogOperations(), equalTo(ops));
+        assertThat(lastRead.recoveredOperations(), equalTo(ops));
+        assertThat(lastRead.totalOperations(), equalTo(totalOps));
+        assertThat(lastRead.totalOperationsOnStart(), equalTo(totalOpsOnStart));
         assertThat(lastRead.startTime(), equalTo(translog.startTime()));
         assertThat(lastRead.startTime(), equalTo(translog.startTime()));
         assertThat(lastRead.stopTime(), equalTo(translog.stopTime()));
         assertThat(lastRead.stopTime(), equalTo(translog.stopTime()));