Browse Source

Recovery: add throttle stats

This commit adds throttling statistics to the index stats API and to the recovery state.

Closes #10097
Boaz Leskes 10 years ago
parent
commit
8c69535580
29 changed files with 511 additions and 100 deletions
  1. 21 21
      rest-api-spec/test/indices.recovery/10_basic.yaml
  2. 28 0
      rest-api-spec/test/indices.stats/11_metric.yaml
  3. 28 0
      src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java
  4. 3 1
      src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java
  5. 9 0
      src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java
  6. 5 0
      src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java
  7. 5 2
      src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java
  8. 148 0
      src/main/java/org/elasticsearch/index/recovery/RecoveryStats.java
  9. 8 1
      src/main/java/org/elasticsearch/index/shard/IndexShard.java
  10. 14 20
      src/main/java/org/elasticsearch/indices/IndicesService.java
  11. 6 0
      src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
  12. 9 1
      src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java
  13. 6 0
      src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java
  14. 6 5
      src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
  15. 64 24
      src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
  16. 3 0
      src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java
  17. 8 2
      src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
  18. 1 0
      src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java
  19. 1 1
      src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
  20. 86 2
      src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java
  21. 29 1
      src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java
  22. 9 12
      src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java
  23. 7 0
      src/test/java/org/elasticsearch/test/rest/section/Assertion.java
  24. 1 1
      src/test/java/org/elasticsearch/test/rest/section/GreaterThanAssertion.java
  25. 1 1
      src/test/java/org/elasticsearch/test/rest/section/GreaterThanEqualToAssertion.java
  26. 1 1
      src/test/java/org/elasticsearch/test/rest/section/LengthAssertion.java
  27. 1 1
      src/test/java/org/elasticsearch/test/rest/section/LessThanAssertion.java
  28. 1 1
      src/test/java/org/elasticsearch/test/rest/section/LessThanOrEqualToAssertion.java
  29. 2 2
      src/test/java/org/elasticsearch/test/rest/section/MatchAssertion.java

+ 21 - 21
rest-api-spec/test/indices.recovery/10_basic.yaml

@@ -11,29 +11,29 @@
 
   - do:
       cluster.health:
-        wait_for_status: yellow
+        wait_for_status: green
 
   - do:
       indices.recovery:
         index: [test_1]
 
-  - match: { test_1.shards.0.type:                              "GATEWAY"               }
-  - match: { test_1.shards.0.stage:                             "DONE"                  }
-  - match: { test_1.shards.0.primary:                           true                    }
-  - match: { test_1.shards.0.target.ip:                         /^\d+\.\d+\.\d+\.\d+$/  }
-  - gte:   { test_1.shards.0.index.files.total:                 0                       }
-  - gte:   { test_1.shards.0.index.files.reused:                0                       }
-  - gte:   { test_1.shards.0.index.files.recovered:             0                       }
-  - match: { test_1.shards.0.index.files.percent:               /^\d+\.\d\%$/           }
-  - gte:   { test_1.shards.0.index.size.total_in_bytes:         0                       }
-  - gte:   { test_1.shards.0.index.size.reused_in_bytes:        0                       }
-  - gte:   { test_1.shards.0.index.size.recovered_in_bytes:     0                       }
-  - match: { test_1.shards.0.index.size.percent:                /^\d+\.\d\%$/           }
-  - gte:   { test_1.shards.0.translog.recovered:                0                       }
-  - gte:   { test_1.shards.0.translog.total:                    -1                      }
-  - gte:   { test_1.shards.0.translog.total_on_start:           0                       }
-  - gte:   { test_1.shards.0.translog.total_time_in_millis:     0                       }
-  - gte:   { test_1.shards.0.start.check_index_time_in_millis:  0                       }
-  - gte:   { test_1.shards.0.start.total_time_in_millis:        0                       }
-
-
+  - match: { test_1.shards.0.type:                                 "GATEWAY"               }
+  - match: { test_1.shards.0.stage:                                "DONE"                  }
+  - match: { test_1.shards.0.primary:                              true                    }
+  - match: { test_1.shards.0.target.ip:                            /^\d+\.\d+\.\d+\.\d+$/  }
+  - gte:   { test_1.shards.0.index.files.total:                    0                       }
+  - gte:   { test_1.shards.0.index.files.reused:                   0                       }
+  - gte:   { test_1.shards.0.index.files.recovered:                0                       }
+  - match: { test_1.shards.0.index.files.percent:                  /^\d+\.\d\%$/           }
+  - gte:   { test_1.shards.0.index.size.total_in_bytes:            0                       }
+  - gte:   { test_1.shards.0.index.size.reused_in_bytes:           0                       }
+  - gte:   { test_1.shards.0.index.size.recovered_in_bytes:        0                       }
+  - match: { test_1.shards.0.index.size.percent:                   /^\d+\.\d\%$/           }
+  - gte:   { test_1.shards.0.index.source_throttle_time_in_millis: 0                       }
+  - gte:   { test_1.shards.0.index.target_throttle_time_in_millis: 0                       }
+  - gte:   { test_1.shards.0.translog.recovered:                   0                       }
+  - gte:   { test_1.shards.0.translog.total:                       -1                      }
+  - gte:   { test_1.shards.0.translog.total_on_start:              0                       }
+  - gte:   { test_1.shards.0.translog.total_time_in_millis:        0                       }
+  - gte:   { test_1.shards.0.start.check_index_time_in_millis:     0                       }
+  - gte:   { test_1.shards.0.start.total_time_in_millis:           0                       }

+ 28 - 0
rest-api-spec/test/indices.stats/11_metric.yaml

@@ -37,6 +37,7 @@ setup:
   - is_true:  _all.total.segments
   - is_true:  _all.total.translog
   - is_true:  _all.total.suggest
+  - is_true:  _all.total.recovery
 
 ---
 "Metric - _all":
@@ -60,6 +61,7 @@ setup:
   - is_true:  _all.total.segments
   - is_true:  _all.total.translog
   - is_true:  _all.total.suggest
+  - is_true:  _all.total.recovery
 
 ---
 "Metric - one":
@@ -83,6 +85,7 @@ setup:
   - is_false:  _all.total.segments
   - is_false:  _all.total.translog
   - is_false:  _all.total.suggest
+  - is_false:  _all.total.recovery
 
 ---
 "Metric - multi":
@@ -106,5 +109,30 @@ setup:
   - is_false:  _all.total.segments
   - is_false:  _all.total.translog
   - is_false:  _all.total.suggest
+  - is_false:  _all.total.recovery
 
 
+---
+"Metric - recovery":
+  - do:
+      indices.stats: { metric: [ recovery ] }
+
+  - is_false:  _all.total.docs
+  - is_false:  _all.total.store
+  - is_false:  _all.total.indexing
+  - is_false:  _all.total.get
+  - is_false:  _all.total.search
+  - is_false:  _all.total.merges
+  - is_false:  _all.total.refresh
+  - is_false:  _all.total.flush
+  - is_false:  _all.total.warmer
+  - is_false:  _all.total.filter_cache
+  - is_false:  _all.total.id_cache
+  - is_false:  _all.total.fielddata
+  - is_false:  _all.total.percolate
+  - is_false:  _all.total.completion
+  - is_false:  _all.total.segments
+  - is_false:  _all.total.translog
+  - is_false:  _all.total.suggest
+  - is_true:   _all.total.recovery
+

+ 28 - 0
src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java

@@ -36,6 +36,7 @@ import org.elasticsearch.index.get.GetStats;
 import org.elasticsearch.index.indexing.IndexingStats;
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.percolator.stats.PercolateStats;
+import org.elasticsearch.index.recovery.RecoveryStats;
 import org.elasticsearch.index.refresh.RefreshStats;
 import org.elasticsearch.index.search.stats.SearchStats;
 import org.elasticsearch.index.shard.DocsStats;
@@ -115,6 +116,9 @@ public class CommonStats implements Streamable, ToXContent {
                 case QueryCache:
                     queryCache = new QueryCacheStats();
                     break;
+                case Recovery:
+                    recoveryStats = new RecoveryStats();
+                    break;
                 default:
                     throw new IllegalStateException("Unknown Flag: " + flag);
             }
@@ -181,6 +185,9 @@ public class CommonStats implements Streamable, ToXContent {
                 case QueryCache:
                     queryCache = indexShard.queryCache().stats();
                     break;
+                case Recovery:
+                    recoveryStats = indexShard.recoveryStats();
+                    break;
                 default:
                     throw new IllegalStateException("Unknown Flag: " + flag);
             }
@@ -241,6 +248,9 @@ public class CommonStats implements Streamable, ToXContent {
     @Nullable
     public QueryCacheStats queryCache;
 
+    @Nullable
+    public RecoveryStats recoveryStats;
+
     public void add(CommonStats stats) {
         if (docs == null) {
             if (stats.getDocs() != null) {
@@ -388,6 +398,14 @@ public class CommonStats implements Streamable, ToXContent {
         } else {
             queryCache.add(stats.getQueryCache());
         }
+        if (recoveryStats == null) {
+            if (stats.getRecoveryStats() != null) {
+                recoveryStats = new RecoveryStats();
+                recoveryStats.add(stats.getRecoveryStats());
+            }
+        } else {
+            recoveryStats.add(stats.getRecoveryStats());
+        }
     }
 
     @Nullable
@@ -480,6 +498,11 @@ public class CommonStats implements Streamable, ToXContent {
         return queryCache;
     }
 
+    @Nullable
+    public RecoveryStats getRecoveryStats() {
+        return recoveryStats;
+    }
+
     public static CommonStats readCommonStats(StreamInput in) throws IOException {
         CommonStats stats = new CommonStats();
         stats.readFrom(in);
@@ -563,6 +586,7 @@ public class CommonStats implements Streamable, ToXContent {
         translog = in.readOptionalStreamable(new TranslogStats());
         suggest = in.readOptionalStreamable(new SuggestStats());
         queryCache = in.readOptionalStreamable(new QueryCacheStats());
+        recoveryStats = in.readOptionalStreamable(new RecoveryStats());
     }
 
     @Override
@@ -660,6 +684,7 @@ public class CommonStats implements Streamable, ToXContent {
         out.writeOptionalStreamable(translog);
         out.writeOptionalStreamable(suggest);
         out.writeOptionalStreamable(queryCache);
+        out.writeOptionalStreamable(recoveryStats);
     }
 
     // note, requires a wrapping object
@@ -719,6 +744,9 @@ public class CommonStats implements Streamable, ToXContent {
         if (queryCache != null) {
             queryCache.toXContent(builder, params);
         }
+        if (recoveryStats != null) {
+            recoveryStats.toXContent(builder, params);
+        }
         return builder;
     }
 }

+ 3 - 1
src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java

@@ -225,7 +225,9 @@ public class CommonStatsFlags implements Streamable, Cloneable {
         Segments("segments"),
         Translog("translog"),
         Suggest("suggest"),
-        QueryCache("query_cache");
+        QueryCache("query_cache"),
+        Recovery("recovery");
+
 
         private final String restName;
 

+ 9 - 0
src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java

@@ -265,6 +265,15 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
         return flags.isSet(Flag.QueryCache);
     }
 
+    public IndicesStatsRequest recovery(boolean recovery) {
+        flags.set(Flag.Recovery, recovery);
+        return this;
+    }
+
+    public boolean recovery() {
+        return flags.isSet(Flag.Recovery);
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);

+ 5 - 0
src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java

@@ -168,6 +168,11 @@ public class IndicesStatsRequestBuilder extends BroadcastOperationRequestBuilder
         return this;
     }
 
+    public IndicesStatsRequestBuilder setRecovery(boolean recovery) {
+        request.recovery(recovery);
+        return this;
+    }
+
     @Override
     protected void doExecute(ActionListener<IndicesStatsResponse> listener) {
         client.stats(request, listener);

+ 5 - 2
src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

@@ -37,10 +37,10 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.IndexShardMissingException;
 import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.IndexShardMissingException;
 import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -201,6 +201,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
         if (request.request.queryCache()) {
             flags.set(CommonStatsFlags.Flag.QueryCache);
         }
+        if (request.request.recovery()) {
+            flags.set(CommonStatsFlags.Flag.Recovery);
+        }
 
         return new ShardStats(indexShard, indexShard.routingEntry(), flags);
     }

+ 148 - 0
src/main/java/org/elasticsearch/index/recovery/RecoveryStats.java

@@ -0,0 +1,148 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.recovery;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Recovery related statistics, starting at the shard level and allowing aggregation to
+ * indices and node level
+ */
+public class RecoveryStats implements ToXContent, Streamable {
+
+    private final AtomicInteger currentAsSource = new AtomicInteger();
+    private final AtomicInteger currentAsTarget = new AtomicInteger();
+    private final AtomicLong throttleTimeInNanos = new AtomicLong();
+
+    public RecoveryStats() {
+    }
+
+    public void add(RecoveryStats recoveryStats) {
+        if (recoveryStats != null) {
+            this.currentAsSource.addAndGet(recoveryStats.currentAsSource());
+            this.currentAsTarget.addAndGet(recoveryStats.currentAsTarget());
+            this.throttleTimeInNanos.addAndGet(recoveryStats.throttleTime().nanos());
+        }
+    }
+
+    /**
+     * add statistics that should be accumulated about old shards after they have been
+     * deleted or relocated
+     */
+    public void addAsOld(RecoveryStats recoveryStats) {
+        if (recoveryStats != null) {
+            this.throttleTimeInNanos.addAndGet(recoveryStats.throttleTime().nanos());
+        }
+    }
+
+    /**
+     * Number of ongoing recoveries for which a shard serves as a source
+     */
+    public int currentAsSource() {
+        return currentAsSource.get();
+    }
+
+    /**
+     * Number of ongoing recoveries for which a shard serves as a source
+     */
+    public int currentAsTarget() {
+        return currentAsTarget.get();
+    }
+
+    /**
+     * Total time recoveries waited due to throttling
+     */
+    public TimeValue throttleTime() {
+        return TimeValue.timeValueNanos(throttleTimeInNanos.get());
+    }
+
+    public void incCurrentAsTarget() {
+        currentAsTarget.incrementAndGet();
+    }
+
+    public void decCurrentAsTarget() {
+        currentAsTarget.decrementAndGet();
+    }
+
+    public void incCurrentAsSource() {
+        currentAsSource.incrementAndGet();
+    }
+
+    public void decCurrentAsSource() {
+        currentAsSource.decrementAndGet();
+    }
+
+    public void addThrottleTime(long nanos) {
+        throttleTimeInNanos.addAndGet(nanos);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(Fields.RECOVERY);
+        builder.field(Fields.CURRENT_AS_SOURCE, currentAsSource());
+        builder.field(Fields.CURRENT_AS_TARGET, currentAsTarget());
+        builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, throttleTime());
+        builder.endObject();
+        return builder;
+    }
+
+    public static RecoveryStats readRecoveryStats(StreamInput in) throws IOException {
+        RecoveryStats stats = new RecoveryStats();
+        stats.readFrom(in);
+        return stats;
+    }
+
+    static final class Fields {
+        static final XContentBuilderString RECOVERY = new XContentBuilderString("recovery");
+        static final XContentBuilderString CURRENT_AS_SOURCE = new XContentBuilderString("current_as_source");
+        static final XContentBuilderString CURRENT_AS_TARGET = new XContentBuilderString("current_as_target");
+        static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
+        static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        currentAsSource.set(in.readVInt());
+        currentAsTarget.set(in.readVInt());
+        throttleTimeInNanos.set(in.readLong());
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVInt(currentAsSource.get());
+        out.writeVInt(currentAsTarget.get());
+        out.writeLong(throttleTimeInNanos.get());
+    }
+
+    @Override
+    public String toString() {
+        return "recoveryStats, currentAsSource [" + currentAsSource() + "],currentAsTarget ["
+                + currentAsTarget() + "], throttle [" + throttleTime() + "]";
+    }
+}

+ 8 - 1
src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -37,7 +37,6 @@ import org.elasticsearch.action.WriteFailureException;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
 import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RestoreSource;
@@ -87,6 +86,7 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
 import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
 import org.elasticsearch.index.percolator.stats.ShardPercolateService;
 import org.elasticsearch.index.query.IndexQueryParserService;
+import org.elasticsearch.index.recovery.RecoveryStats;
 import org.elasticsearch.index.refresh.RefreshStats;
 import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
 import org.elasticsearch.index.search.stats.SearchStats;
@@ -172,6 +172,8 @@ public class IndexShard extends AbstractIndexShardComponent {
     @Nullable
     private RecoveryState recoveryState;
 
+    private final RecoveryStats recoveryStats = new RecoveryStats();
+
     private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
 
     private final MeanMetric refreshMetric = new MeanMetric();
@@ -794,6 +796,11 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
+    /** returns stats about ongoing recoveries, both source and target */
+    public RecoveryStats recoveryStats() {
+        return recoveryStats;
+    }
+
     /**
      * Returns the current {@link RecoveryState} if this shard is recovering or has been recovering.
      * Returns null if the recovery has not yet started or shard was not recovered (created via an API).

+ 14 - 20
src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -20,13 +20,7 @@
 package org.elasticsearch.indices;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
+import com.google.common.collect.*;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.util.CollectionUtil;
 import org.apache.lucene.util.IOUtils;
@@ -38,18 +32,12 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
 import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
-import org.elasticsearch.bootstrap.Elasticsearch;
-import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.inject.CreationException;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.inject.Injector;
-import org.elasticsearch.common.inject.Injectors;
-import org.elasticsearch.common.inject.ModulesBuilder;
+import org.elasticsearch.common.inject.*;
 import org.elasticsearch.common.io.FileSystemUtils;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -58,11 +46,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.ShardLock;
 import org.elasticsearch.gateway.MetaDataStateFormat;
-import org.elasticsearch.index.Index;
-import org.elasticsearch.index.IndexModule;
-import org.elasticsearch.index.IndexNameModule;
-import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.LocalNodeIdModule;
+import org.elasticsearch.index.*;
 import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
 import org.elasticsearch.index.analysis.AnalysisModule;
 import org.elasticsearch.index.analysis.AnalysisService;
@@ -78,6 +62,7 @@ import org.elasticsearch.index.mapper.MapperServiceModule;
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.query.IndexQueryParserModule;
 import org.elasticsearch.index.query.IndexQueryParserService;
+import org.elasticsearch.index.recovery.RecoveryStats;
 import org.elasticsearch.index.refresh.RefreshStats;
 import org.elasticsearch.index.search.stats.SearchStats;
 import org.elasticsearch.index.settings.IndexSettings;
@@ -97,7 +82,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.collect.Maps.newHashMap;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@@ -212,6 +200,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
                     case Refresh:
                         oldStats.refresh.add(oldShardsStats.refreshStats);
                         break;
+                    case Recovery:
+                        oldStats.recoveryStats.add(oldShardsStats.recoveryStats);
+                        break;
                     case Flush:
                         oldStats.flush.add(oldShardsStats.flushStats);
                         break;
@@ -235,6 +226,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
                     }
                 } catch (IllegalIndexShardStateException e) {
                     // we can safely ignore illegal state on ones that are closing for example
+                    logger.trace("{} ignoring shard stats", e, indexShard.shardId());
                 }
             }
         }
@@ -417,6 +409,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
         final MergeStats mergeStats = new MergeStats();
         final RefreshStats refreshStats = new RefreshStats();
         final FlushStats flushStats = new FlushStats();
+        final RecoveryStats recoveryStats = new RecoveryStats();
 
         @Override
         public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@@ -428,6 +421,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
                 mergeStats.add(indexShard.mergeStats());
                 refreshStats.add(indexShard.refreshStats());
                 flushStats.add(indexShard.flushStats());
+                recoveryStats.addAsOld(indexShard.recoveryStats());
             }
         }
     }

+ 6 - 0
src/main/java/org/elasticsearch/indices/NodeIndicesStats.java

@@ -42,6 +42,7 @@ import org.elasticsearch.index.get.GetStats;
 import org.elasticsearch.index.indexing.IndexingStats;
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.percolator.stats.PercolateStats;
+import org.elasticsearch.index.recovery.RecoveryStats;
 import org.elasticsearch.index.refresh.RefreshStats;
 import org.elasticsearch.index.search.stats.SearchStats;
 import org.elasticsearch.index.shard.DocsStats;
@@ -160,6 +161,11 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
         return stats.getSuggest();
     }
 
+    @Nullable
+    public RecoveryStats getRecoveryStats() {
+        return stats.getRecoveryStats();
+    }
+
     public static NodeIndicesStats readIndicesStats(StreamInput in) throws IOException {
         NodeIndicesStats stats = new NodeIndicesStats();
         stats.readFrom(in);

+ 9 - 1
src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java

@@ -41,6 +41,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
     private long position;
     private BytesReference content;
     private StoreFileMetaData metaData;
+    private long sourceThrottleTimeInNanos;
 
     private int totalTranslogOps;
 
@@ -48,7 +49,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
     }
 
     public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content,
-                                    boolean lastChunk, int totalTranslogOps) {
+                                    boolean lastChunk, int totalTranslogOps, long sourceThrottleTimeInNanos) {
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.metaData = metaData;
@@ -56,6 +57,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
         this.content = content;
         this.lastChunk = lastChunk;
         this.totalTranslogOps = totalTranslogOps;
+        this.sourceThrottleTimeInNanos = sourceThrottleTimeInNanos;
     }
 
     public long recoveryId() {
@@ -91,6 +93,10 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
         return totalTranslogOps;
     }
 
+    public long sourceThrottleTimeInNanos() {
+        return sourceThrottleTimeInNanos;
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
@@ -107,6 +113,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
         metaData = new StoreFileMetaData(name, length, checksum, writtenBy);
         lastChunk = in.readBoolean();
         totalTranslogOps = in.readVInt();
+        sourceThrottleTimeInNanos = in.readLong();
     }
 
     @Override
@@ -122,6 +129,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {  // publi
         out.writeOptionalString(metaData.writtenBy() == null ? null : metaData.writtenBy().toString());
         out.writeBoolean(lastChunk);
         out.writeVInt(totalTranslogOps);
+        out.writeLong(sourceThrottleTimeInNanos);
     }
 
     @Override

+ 6 - 0
src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java

@@ -160,6 +160,7 @@ public class RecoverySource extends AbstractComponent {
             }
             assert shardRecoveryHandlers.contains(handler) == false : "Handler was already registered [" + handler + "]";
             shardRecoveryHandlers.add(handler);
+            shard.recoveryStats().incCurrentAsSource();
         }
 
         synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
@@ -167,6 +168,9 @@ public class RecoverySource extends AbstractComponent {
             assert shardRecoveryHandlers != null : "Shard was not registered [" + shard + "]";
             boolean remove = shardRecoveryHandlers.remove(handler);
             assert remove : "Handler was not registered [" + handler + "]";
+            if (remove) {
+                shard.recoveryStats().decCurrentAsSource();
+            }
             if (shardRecoveryHandlers.isEmpty()) {
                 ongoingRecoveries.remove(shard);
             }
@@ -181,6 +185,8 @@ public class RecoverySource extends AbstractComponent {
                         handlers.cancel(reason);
                     } catch (Exception ex) {
                         failures.add(ex);
+                    } finally {
+                        shard.recoveryStats().decCurrentAsSource();
                     }
                 }
                 ExceptionsHelper.maybeThrowRuntimeAndSuppress(failures);

+ 6 - 5
src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -276,22 +276,23 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
                                 final long position = indexInput.getFilePointer();
 
                                 // Pause using the rate limiter, if desired, to throttle the recovery
+                                long throttleTimeInNanos = 0;
                                 if (recoverySettings.rateLimiter() != null) {
-                                    recoverySettings.rateLimiter().pause(toRead);
+                                    throttleTimeInNanos = recoverySettings.rateLimiter().pause(toRead);
                                 }
-
+                                shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
                                 indexInput.readBytes(buf, 0, toRead, false);
                                 final BytesArray content = new BytesArray(buf, 0, toRead);
                                 readCount += toRead;
                                 final boolean lastChunk = readCount == len;
+                                final RecoveryFileChunkRequest fileChunkRequest = new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position,
+                                        content, lastChunk, shard.translog().estimatedNumberOfOperations(), throttleTimeInNanos);
                                 cancellableThreads.execute(new Interruptable() {
                                     @Override
                                     public void run() throws InterruptedException {
                                         // Actually send the file chunk to the target node, waiting for it to complete
                                         transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
-                                                new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content,
-                                                        lastChunk, shard.translog().estimatedNumberOfOperations()),
-                                                requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+                                                fileChunkRequest, requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                                     }
                                 });
 

+ 64 - 24
src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

@@ -28,7 +28,7 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilderString;
@@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.index.shard.ShardId;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -374,6 +375,10 @@ public class RecoveryState implements ToXContent, Streamable {
         static final XContentBuilderString PERCENT = new XContentBuilderString("percent");
         static final XContentBuilderString DETAILS = new XContentBuilderString("details");
         static final XContentBuilderString SIZE = new XContentBuilderString("size");
+        static final XContentBuilderString SOURCE_THROTTLE_TIME = new XContentBuilderString("source_throttle_time");
+        static final XContentBuilderString SOURCE_THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("source_throttle_time_in_millis");
+        static final XContentBuilderString TARGET_THROTTLE_TIME = new XContentBuilderString("target_throttle_time");
+        static final XContentBuilderString TARGET_THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("target_throttle_time_in_millis");
     }
 
     public static class Timer implements Streamable {
@@ -658,43 +663,72 @@ public class RecoveryState implements ToXContent, Streamable {
 
     public static class Index extends Timer implements ToXContent, Streamable {
 
-        private Map<String, File> fileDetails = ConcurrentCollections.newConcurrentMap();
+        private Map<String, File> fileDetails = new HashMap<>();
 
-        private volatile long version = -1;
+        public final static long UNKNOWN = -1L;
 
-        public List<File> fileDetails() {
+        private long version = UNKNOWN;
+        private long sourceThrottlingInNanos = UNKNOWN;
+        private long targetThrottleTimeInNanos = UNKNOWN;
+
+        public synchronized List<File> fileDetails() {
             return ImmutableList.copyOf(fileDetails.values());
         }
 
-        public void reset() {
+        public synchronized void reset() {
             super.reset();
-            version = -1;
+            version = UNKNOWN;
             fileDetails.clear();
+            sourceThrottlingInNanos = UNKNOWN;
+            targetThrottleTimeInNanos = UNKNOWN;
         }
 
-
-        public void addFileDetail(String name, long length, boolean reused) {
+        public synchronized void addFileDetail(String name, long length, boolean reused) {
             File file = new File(name, length, reused);
             File existing = fileDetails.put(name, file);
             assert existing == null : "file [" + name + "] is already reported";
         }
 
-        public void addRecoveredBytesToFile(String name, long bytes) {
+        public synchronized void addRecoveredBytesToFile(String name, long bytes) {
             File file = fileDetails.get(name);
             file.addRecoveredBytes(bytes);
         }
 
-        public long version() {
+        public synchronized long version() {
             return this.version;
         }
 
+        public synchronized void addSourceThrottling(long timeInNanos) {
+            if (sourceThrottlingInNanos == UNKNOWN) {
+                sourceThrottlingInNanos = timeInNanos;
+            } else {
+                sourceThrottlingInNanos += timeInNanos;
+            }
+        }
+
+        public synchronized void addTargetThrottling(long timeInNanos) {
+            if (targetThrottleTimeInNanos == UNKNOWN) {
+                targetThrottleTimeInNanos = timeInNanos;
+            } else {
+                targetThrottleTimeInNanos += timeInNanos;
+            }
+        }
+
+        public synchronized TimeValue sourceThrottling() {
+            return TimeValue.timeValueNanos(sourceThrottlingInNanos);
+        }
+
+        public synchronized TimeValue targetThrottling() {
+            return TimeValue.timeValueNanos(targetThrottleTimeInNanos);
+        }
+
         /** total number of files that are part of this recovery, both re-used and recovered */
-        public int totalFileCount() {
+        public synchronized int totalFileCount() {
             return fileDetails.size();
         }
 
         /** total number of files to be recovered (potentially not yet done) */
-        public int totalRecoverFiles() {
+        public synchronized int totalRecoverFiles() {
             int total = 0;
             for (File file : fileDetails.values()) {
                 if (file.reused() == false) {
@@ -706,7 +740,7 @@ public class RecoveryState implements ToXContent, Streamable {
 
 
         /** number of file that were recovered (excluding on ongoing files) */
-        public int recoveredFileCount() {
+        public synchronized int recoveredFileCount() {
             int count = 0;
             for (File file : fileDetails.values()) {
                 if (file.fullyRecovered()) {
@@ -717,7 +751,7 @@ public class RecoveryState implements ToXContent, Streamable {
         }
 
         /** percent of recovered (i.e., not reused) files out of the total files to be recovered */
-        public float recoveredFilesPercent() {
+        public synchronized float recoveredFilesPercent() {
             int total = 0;
             int recovered = 0;
             for (File file : fileDetails.values()) {
@@ -740,7 +774,7 @@ public class RecoveryState implements ToXContent, Streamable {
         }
 
         /** total number of bytes in th shard */
-        public long totalBytes() {
+        public synchronized long totalBytes() {
             long total = 0;
             for (File file : fileDetails.values()) {
                 total += file.length();
@@ -749,7 +783,7 @@ public class RecoveryState implements ToXContent, Streamable {
         }
 
         /** total number of bytes recovered so far, including both existing and reused */
-        public long recoveredBytes() {
+        public synchronized long recoveredBytes() {
             long recovered = 0;
             for (File file : fileDetails.values()) {
                 recovered += file.recovered();
@@ -758,7 +792,7 @@ public class RecoveryState implements ToXContent, Streamable {
         }
 
         /** total bytes of files to be recovered (potentially not yet done) */
-        public long totalRecoverBytes() {
+        public synchronized long totalRecoverBytes() {
             long total = 0;
             for (File file : fileDetails.values()) {
                 if (file.reused() == false) {
@@ -768,7 +802,7 @@ public class RecoveryState implements ToXContent, Streamable {
             return total;
         }
 
-        public long totalReuseBytes() {
+        public synchronized long totalReuseBytes() {
             long total = 0;
             for (File file : fileDetails.values()) {
                 if (file.reused()) {
@@ -779,7 +813,7 @@ public class RecoveryState implements ToXContent, Streamable {
         }
 
         /** percent of bytes recovered out of total files bytes *to be* recovered */
-        public float recoveredBytesPercent() {
+        public synchronized float recoveredBytesPercent() {
             long total = 0;
             long recovered = 0;
             for (File file : fileDetails.values()) {
@@ -799,7 +833,7 @@ public class RecoveryState implements ToXContent, Streamable {
             }
         }
 
-        public int reusedFileCount() {
+        public synchronized int reusedFileCount() {
             int reused = 0;
             for (File file : fileDetails.values()) {
                 if (file.reused()) {
@@ -809,7 +843,7 @@ public class RecoveryState implements ToXContent, Streamable {
             return reused;
         }
 
-        public long reusedBytes() {
+        public synchronized long reusedBytes() {
             long reused = 0;
             for (File file : fileDetails.values()) {
                 if (file.reused()) {
@@ -819,7 +853,7 @@ public class RecoveryState implements ToXContent, Streamable {
             return reused;
         }
 
-        public void updateVersion(long version) {
+        public synchronized void updateVersion(long version) {
             this.version = version;
         }
 
@@ -831,6 +865,8 @@ public class RecoveryState implements ToXContent, Streamable {
                 File file = File.readFile(in);
                 fileDetails.put(file.name, file);
             }
+            sourceThrottlingInNanos = in.readLong();
+            targetThrottleTimeInNanos = in.readLong();
         }
 
         @Override
@@ -841,10 +877,12 @@ public class RecoveryState implements ToXContent, Streamable {
             for (File file : files) {
                 file.writeTo(out);
             }
+            out.writeLong(sourceThrottlingInNanos);
+            out.writeLong(targetThrottleTimeInNanos);
         }
 
         @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             // stream size first, as it matters more and the files section can be long
             builder.startObject(Fields.SIZE);
             builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, totalBytes());
@@ -867,11 +905,13 @@ public class RecoveryState implements ToXContent, Streamable {
             }
             builder.endObject();
             builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time());
+            builder.timeValueField(Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, Fields.SOURCE_THROTTLE_TIME, sourceThrottling());
+            builder.timeValueField(Fields.TARGET_THROTTLE_TIME_IN_MILLIS, Fields.TARGET_THROTTLE_TIME, targetThrottling());
             return builder;
         }
 
         @Override
-        public String toString() {
+        public synchronized String toString() {
             try {
                 XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
                 builder.startObject();

+ 3 - 0
src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java

@@ -85,6 +85,8 @@ public class RecoveryStatus extends AbstractRefCounted {
         this.store = indexShard.store();
         // make sure the store is not released until we are done.
         store.incRef();
+        indexShard.recoveryStats().incCurrentAsTarget();
+        logger.info("--> incremented recoveries {}", indexShard.recoveryStats());
     }
 
     private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
@@ -239,6 +241,7 @@ public class RecoveryStatus extends AbstractRefCounted {
         } finally {
             // free store. increment happens in constructor
             store.decRef();
+            indexShard.recoveryStats().decCurrentAsTarget();
         }
     }
 

+ 8 - 2
src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -432,6 +432,10 @@ public class RecoveryTarget extends AbstractComponent {
                 final RecoveryStatus recoveryStatus = statusRef.status();
                 final Store store = recoveryStatus.store();
                 recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
+                final RecoveryState.Index indexState = recoveryStatus.state().getIndex();
+                if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
+                    indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
+                }
                 IndexOutput indexOutput;
                 if (request.position() == 0) {
                     indexOutput = recoveryStatus.openAndPutIndexOutput(request.name(), request.metadata(), store);
@@ -439,14 +443,16 @@ public class RecoveryTarget extends AbstractComponent {
                     indexOutput = recoveryStatus.getOpenIndexOutput(request.name());
                 }
                 if (recoverySettings.rateLimiter() != null) {
-                    recoverySettings.rateLimiter().pause(request.content().length());
+                    long targetThrottling = recoverySettings.rateLimiter().pause(request.content().length());
+                    indexState.addTargetThrottling(targetThrottling);
+                    recoveryStatus.indexShard().recoveryStats().addThrottleTime(targetThrottling);
                 }
                 BytesReference content = request.content();
                 if (!content.hasArray()) {
                     content = content.toBytesArray();
                 }
                 indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
-                recoveryStatus.state().getIndex().addRecoveredBytesToFile(request.name(), content.length());
+                indexState.addRecoveredBytesToFile(request.name(), content.length());
                 if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {
                     try {
                         Store.verify(indexOutput);

+ 1 - 0
src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java

@@ -81,6 +81,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
             indicesStatsRequest.completion(metrics.contains("completion"));
             indicesStatsRequest.suggest(metrics.contains("suggest"));
             indicesStatsRequest.queryCache(metrics.contains("query_cache"));
+            indicesStatsRequest.recovery(metrics.contains("recovery"));
         }
 
         if (request.hasParam("groups")) {

+ 1 - 1
src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java

@@ -404,7 +404,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
                         RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
                         if (truncate && req.length() > 1) {
                             BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int) req.length() - 1);
-                            request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk(), req.totalTranslogOps());
+                            request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk(), req.totalTranslogOps(), req.sourceThrottleTimeInNanos());
                         } else {
                             byte[] array = req.content().array();
                             int i = randomIntBetween(0, req.content().length() - 1);

+ 86 - 2
src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java

@@ -22,11 +22,14 @@ package org.elasticsearch.indices.recovery;
 import com.carrotsearch.randomizedtesting.LifecycleScope;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
 import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
+import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
@@ -39,8 +42,10 @@ import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.discovery.DiscoveryService;
+import org.elasticsearch.index.recovery.RecoveryStats;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.Store;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.recovery.RecoveryState.Stage;
 import org.elasticsearch.indices.recovery.RecoveryState.Type;
 import org.elasticsearch.snapshots.SnapshotState;
@@ -228,13 +233,13 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
     @TestLogging("indices.recovery:TRACE")
     public void rerouteRecoveryTest() throws Exception {
         logger.info("--> start node A");
-        String nodeA = internalCluster().startNode();
+        final String nodeA = internalCluster().startNode();
 
         logger.info("--> create index on node: {}", nodeA);
         ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats().getStore().size();
 
         logger.info("--> start node B");
-        String nodeB = internalCluster().startNode();
+        final String nodeB = internalCluster().startNode();
 
         ensureGreen();
 
@@ -246,6 +251,18 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
                 .add(new MoveAllocationCommand(new ShardId(INDEX_NAME, 0), nodeA, nodeB))
                 .execute().actionGet().getState();
 
+        logger.info("--> waiting for recovery to start both on source and target");
+        assertBusy(new Runnable() {
+            @Override
+            public void run() {
+                IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA);
+                assertThat(indicesService.indexServiceSafe(INDEX_NAME).shardSafe(0).recoveryStats().currentAsSource(),
+                        equalTo(1));
+                indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
+                assertThat(indicesService.indexServiceSafe(INDEX_NAME).shardSafe(0).recoveryStats().currentAsTarget(),
+                        equalTo(1));
+            }
+        });
 
         logger.info("--> request recoveries");
         RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
@@ -262,6 +279,45 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
         assertOnGoingRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, nodeA, nodeB, false);
         validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex());
 
+        logger.info("--> request node recovery stats");
+        NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
+        long nodeAThrottling = Long.MAX_VALUE;
+        long nodeBThrottling = Long.MAX_VALUE;
+        for (NodeStats nodeStats : statsResponse.getNodes()) {
+            final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
+            if (nodeStats.getNode().name().equals(nodeA)) {
+                assertThat("node A should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1));
+                assertThat("node A should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0));
+                nodeAThrottling = recoveryStats.throttleTime().millis();
+            }
+            if (nodeStats.getNode().name().equals(nodeB)) {
+                assertThat("node B should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0));
+                assertThat("node B should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1));
+                nodeBThrottling = recoveryStats.throttleTime().millis();
+            }
+        }
+
+        logger.info("--> checking throttling increases");
+        final long finalNodeAThrottling = nodeAThrottling;
+        final long finalNodeBThrottling = nodeBThrottling;
+        assertBusy(new Runnable() {
+            @Override
+            public void run() {
+                NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
+                assertThat(statsResponse.getNodes(), arrayWithSize(2));
+                for (NodeStats nodeStats : statsResponse.getNodes()) {
+                    final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
+                    if (nodeStats.getNode().name().equals(nodeA)) {
+                        assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling));
+                    }
+                    if (nodeStats.getNode().name().equals(nodeB)) {
+                        assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling));
+                    }
+                }
+            }
+        });
+
+
         logger.info("--> speeding up recoveries");
         restoreRecoverySpeed();
 
@@ -276,11 +332,39 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
         assertRecoveryState(shardResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false);
         validateIndexRecoveryState(shardResponses.get(0).recoveryState().getIndex());
 
+        statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
+        assertThat(statsResponse.getNodes(), arrayWithSize(2));
+        for (NodeStats nodeStats : statsResponse.getNodes()) {
+            final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
+            assertThat(recoveryStats.currentAsSource(), equalTo(0));
+            assertThat(recoveryStats.currentAsTarget(), equalTo(0));
+            if (nodeStats.getNode().name().equals(nodeA)) {
+                assertThat("node A throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0l));
+            }
+            if (nodeStats.getNode().name().equals(nodeB)) {
+                assertThat("node B throttling should be >0 ", recoveryStats.throttleTime().millis(), greaterThan(0l));
+            }
+        }
+
         logger.info("--> bump replica count");
         client().admin().indices().prepareUpdateSettings(INDEX_NAME)
                 .setSettings(settingsBuilder().put("number_of_replicas", 1)).execute().actionGet();
         ensureGreen();
 
+        statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
+        assertThat(statsResponse.getNodes(), arrayWithSize(2));
+        for (NodeStats nodeStats : statsResponse.getNodes()) {
+            final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
+            assertThat(recoveryStats.currentAsSource(), equalTo(0));
+            assertThat(recoveryStats.currentAsTarget(), equalTo(0));
+            if (nodeStats.getNode().name().equals(nodeA)) {
+                assertThat("node A throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0l));
+            }
+            if (nodeStats.getNode().name().equals(nodeB)) {
+                assertThat("node B throttling should be >0 ", recoveryStats.throttleTime().millis(), greaterThan(0l));
+            }
+        }
+
         logger.info("--> start node C");
         String nodeC = internalCluster().startNode();
         assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());

+ 29 - 1
src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java

@@ -199,6 +199,12 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
             index.start();
             for (int i = randomIntBetween(0, 10); i > 0; i--) {
                 index.addFileDetail("t_" + i, randomIntBetween(1, 100), randomBoolean());
+                if (randomBoolean()) {
+                    index.addSourceThrottling(randomIntBetween(0, 20));
+                }
+                if (randomBoolean()) {
+                    index.addTargetThrottling(randomIntBetween(0, 20));
+                }
             }
             if (randomBoolean()) {
                 index.stop();
@@ -210,6 +216,8 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
         // before we start we must report 0
         assertThat(index.recoveredFilesPercent(), equalTo((float) 0.0));
         assertThat(index.recoveredBytesPercent(), equalTo((float) 0.0));
+        assertThat(index.sourceThrottling().nanos(), equalTo(Index.UNKNOWN));
+        assertThat(index.targetThrottling().nanos(), equalTo(Index.UNKNOWN));
 
         index.start();
         for (File file : files) {
@@ -247,11 +255,27 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
         backgroundReader.start();
 
         long recoveredBytes = 0;
+        long sourceThrottling = Index.UNKNOWN;
+        long targetThrottling = Index.UNKNOWN;
         while (bytesToRecover > 0) {
             File file = randomFrom(filesToRecover);
-            long toRecover = Math.min(bytesToRecover, randomIntBetween(1, (int) (file.length() - file.recovered())));
+            final long toRecover = Math.min(bytesToRecover, randomIntBetween(1, (int) (file.length() - file.recovered())));
+            final long throttledOnSource = rarely() ? randomIntBetween(10, 200) : 0;
+            index.addSourceThrottling(throttledOnSource);
+            if (sourceThrottling == Index.UNKNOWN) {
+                sourceThrottling = throttledOnSource;
+            } else {
+                sourceThrottling += throttledOnSource;
+            }
             index.addRecoveredBytesToFile(file.name(), toRecover);
             file.addRecoveredBytes(toRecover);
+            final long throttledOnTarget = rarely() ? randomIntBetween(10, 200) : 0;
+            if (targetThrottling == Index.UNKNOWN) {
+                targetThrottling = throttledOnTarget;
+            } else {
+                targetThrottling += throttledOnTarget;
+            }
+            index.addTargetThrottling(throttledOnTarget);
             bytesToRecover -= toRecover;
             recoveredBytes += toRecover;
             if (file.reused() || file.fullyRecovered()) {
@@ -278,6 +302,8 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
             assertThat(lastRead.time(), lessThanOrEqualTo(index.time()));
         }
         assertThat(lastRead.stopTime(), equalTo(index.stopTime()));
+        assertThat(lastRead.targetThrottling(), equalTo(index.targetThrottling()));
+        assertThat(lastRead.sourceThrottling(), equalTo(index.sourceThrottling()));
 
         logger.info("testing post recovery");
         assertThat(index.totalBytes(), equalTo(totalFileBytes));
@@ -288,6 +314,8 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
         assertThat(index.totalRecoverFiles(), equalTo(files.length - totalReused));
         assertThat(index.recoveredFileCount(), equalTo(index.totalRecoverFiles() - filesToRecover.size()));
         assertThat(index.recoveredBytes(), equalTo(recoveredBytes));
+        assertThat(index.targetThrottling().nanos(), equalTo(targetThrottling));
+        assertThat(index.sourceThrottling().nanos(), equalTo(sourceThrottling));
         if (index.totalRecoverFiles() == 0) {
             assertThat((double) index.recoveredFilesPercent(), equalTo(100.0));
             assertThat((double) index.recoveredBytesPercent(), equalTo(100.0));

+ 9 - 12
src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java

@@ -21,12 +21,8 @@ package org.elasticsearch.indices.stats;
 
 import org.apache.lucene.util.Version;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
-import org.elasticsearch.action.admin.indices.stats.CommonStats;
-import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
+import org.elasticsearch.action.admin.indices.stats.*;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
-import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
@@ -61,12 +57,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.*;
 
 @ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, randomDynamicTemplates = false)
 public class IndexStatsTests extends ElasticsearchIntegrationTest {
@@ -662,7 +653,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
     @Test
     public void testFlagOrdinalOrder() {
         Flag[] flags = new Flag[]{Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Merge, Flag.Flush, Flag.Refresh,
-                Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion, Flag.Segments, Flag.Translog, Flag.Suggest, Flag.QueryCache};
+                Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion, Flag.Segments,
+                Flag.Translog, Flag.Suggest, Flag.QueryCache, Flag.Recovery};
 
         assertThat(flags.length, equalTo(Flag.values().length));
         for (int i = 0; i < flags.length; i++) {
@@ -933,6 +925,9 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
             case QueryCache:
                 builder.setQueryCache(set);
                 break;
+            case Recovery:
+                builder.setRecovery(set);
+                break;
             default:
                 fail("new flag? " + flag);
                 break;
@@ -977,6 +972,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
                 return response.getSuggest() != null;
             case QueryCache:
                 return response.getQueryCache() != null;
+            case Recovery:
+                return response.getRecoveryStats() != null;
             default:
                 fail("new flag? " + flag);
                 return false;

+ 7 - 0
src/test/java/org/elasticsearch/test/rest/section/Assertion.java

@@ -73,4 +73,11 @@ public abstract class Assertion implements ExecutableSection {
      * Executes the assertion comparing the actual value (parsed from the response) with the expected one
      */
     protected abstract void doAssert(Object actualValue, Object expectedValue);
+
+    /**
+     * a utility to get the class of an object, protecting for null (i.e., returning null if the input is null)
+     */
+    protected Class<?> safeClass(Object o) {
+        return o == null ? null : o.getClass();
+    }
 }

+ 1 - 1
src/test/java/org/elasticsearch/test/rest/section/GreaterThanAssertion.java

@@ -43,7 +43,7 @@ public class GreaterThanAssertion extends Assertion {
     @SuppressWarnings("unchecked")
     protected void doAssert(Object actualValue, Object expectedValue) {
         logger.trace("assert that [{}] is greater than [{}] (field: [{}])", actualValue, expectedValue, getField());
-        assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
+        assertThat("value of [" + getField() + "] is not comparable (got [" + safeClass(actualValue) + "])", actualValue, instanceOf(Comparable.class));
         assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
         try {
             assertThat(errorMessage(), (Comparable) actualValue, greaterThan((Comparable) expectedValue));

+ 1 - 1
src/test/java/org/elasticsearch/test/rest/section/GreaterThanEqualToAssertion.java

@@ -43,7 +43,7 @@ public class GreaterThanEqualToAssertion extends Assertion {
     @Override
     protected void doAssert(Object actualValue, Object expectedValue) {
         logger.trace("assert that [{}] is greater than or equal to [{}] (field: [{}])", actualValue, expectedValue, getField());
-        assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
+        assertThat("value of [" + getField() + "] is not comparable (got [" + safeClass(actualValue) + "])", actualValue, instanceOf(Comparable.class));
         assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
         try {
             assertThat(errorMessage(), (Comparable) actualValue, greaterThanOrEqualTo((Comparable) expectedValue));

+ 1 - 1
src/test/java/org/elasticsearch/test/rest/section/LengthAssertion.java

@@ -53,7 +53,7 @@ public class LengthAssertion extends Assertion {
         } else if (actualValue instanceof Map) {
             assertThat(errorMessage(), ((Map) actualValue).keySet().size(), equalTo(length));
         } else {
-            throw new UnsupportedOperationException("value is of unsupported type [" + actualValue.getClass().getSimpleName() + "]");
+            throw new UnsupportedOperationException("value is of unsupported type [" + safeClass(actualValue) + "]");
         }
     }
 

+ 1 - 1
src/test/java/org/elasticsearch/test/rest/section/LessThanAssertion.java

@@ -44,7 +44,7 @@ public class LessThanAssertion extends Assertion {
     @SuppressWarnings("unchecked")
     protected void doAssert(Object actualValue, Object expectedValue) {
         logger.trace("assert that [{}] is less than [{}] (field: [{}])", actualValue, expectedValue, getField());
-        assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
+        assertThat("value of [" + getField() + "] is not comparable (got [" + safeClass(actualValue) + "])", actualValue, instanceOf(Comparable.class));
         assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
         try {
             assertThat(errorMessage(), (Comparable) actualValue, lessThan((Comparable) expectedValue));

+ 1 - 1
src/test/java/org/elasticsearch/test/rest/section/LessThanOrEqualToAssertion.java

@@ -43,7 +43,7 @@ public class LessThanOrEqualToAssertion  extends Assertion {
     @Override
     protected void doAssert(Object actualValue, Object expectedValue) {
         logger.trace("assert that [{}] is less than or equal to [{}] (field: [{}])", actualValue, expectedValue, getField());
-        assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
+        assertThat("value of [" + getField() + "] is not comparable (got [" + safeClass(actualValue) + "])", actualValue, instanceOf(Comparable.class));
         assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
         try {
             assertThat(errorMessage(), (Comparable) actualValue, lessThanOrEqualTo((Comparable) expectedValue));

+ 2 - 2
src/test/java/org/elasticsearch/test/rest/section/MatchAssertion.java

@@ -48,7 +48,7 @@ public class MatchAssertion extends Assertion {
         if (expectedValue instanceof String) {
             String expValue = ((String) expectedValue).trim();
             if (expValue.length() > 2 && expValue.startsWith("/") && expValue.endsWith("/")) {
-                assertThat("field [" + getField() + "] was expected to be of type String but is an instanceof [" + actualValue.getClass() + "]", actualValue, instanceOf(String.class));
+                assertThat("field [" + getField() + "] was expected to be of type String but is an instanceof [" + safeClass(actualValue) + "]", actualValue, instanceOf(String.class));
                 String stringValue = (String) actualValue;
                 String regex = expValue.substring(1, expValue.length() - 1);
                 logger.trace("assert that [{}] matches [{}]", stringValue, regex);
@@ -60,7 +60,7 @@ public class MatchAssertion extends Assertion {
 
         assertThat(errorMessage(), actualValue, notNullValue());
         logger.trace("assert that [{}] matches [{}] (field [{}])", actualValue, expectedValue, getField());
-        if (!actualValue.getClass().equals(expectedValue.getClass())) {
+        if (!actualValue.getClass().equals(safeClass(expectedValue))) {
             if (actualValue instanceof Number && expectedValue instanceof Number) {
                 //Double 1.0 is equal to Integer 1
                 assertThat(errorMessage(), ((Number) actualValue).doubleValue(), equalTo(((Number) expectedValue).doubleValue()));