Browse Source

move assertBusy to use CheckException (#25246)

We use assertBusy in many places where the underlying code throw exceptions. Currently we need to wrap those exceptions in a RuntimeException which is ugly.
Boaz Leskes 8 years ago
parent
commit
648b4717a4
23 changed files with 213 additions and 340 deletions
  1. 4 7
      core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java
  2. 1 6
      core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java
  3. 4 12
      core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
  4. 2 12
      core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java
  5. 29 41
      core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java
  6. 3 6
      core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java
  7. 1 6
      core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java
  8. 16 22
      core/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java
  9. 15 18
      core/src/test/java/org/elasticsearch/document/ShardInfoIT.java
  10. 2 2
      core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
  11. 5 8
      core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
  12. 7 10
      core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java
  13. 17 24
      core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
  14. 1 6
      core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java
  15. 38 55
      core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
  16. 1 6
      core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java
  17. 1 9
      core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java
  18. 3 6
      core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java
  19. 15 18
      core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java
  20. 39 51
      core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
  21. 3 6
      modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java
  22. 3 3
      test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
  23. 3 6
      test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

+ 4 - 7
core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java

@@ -34,13 +34,10 @@ public class BulkIntegrationIT extends ESIntegTestCase {
         BulkRequestBuilder bulkBuilder = client().prepareBulk();
         bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
         bulkBuilder.get();
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
-                assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
-                assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
-            }
+        assertBusy(() -> {
+            GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
+            assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
+            assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
         });
     }
 }

+ 1 - 6
core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java

@@ -275,12 +275,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
         transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
 
         // wait until the timeout was triggered and we actually tried to send for the second time
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(transport.capturedRequests().length, equalTo(1));
-            }
-        });
+        assertBusy(() -> assertThat(transport.capturedRequests().length, equalTo(1)));
 
         // let it fail the second time too
         requestId = transport.capturedRequests()[0].requestId;

+ 4 - 12
core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java

@@ -158,12 +158,9 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
         }
 
         internalCluster().stopRandomNonMasterNode();
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
-                assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
-            }
+        assertBusy(() -> {
+            ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
+            assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
         });
 
         logger.info("--> starting the previous master node again...");
@@ -405,12 +402,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
         latch.await();
 
         assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class));
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue());
-            }
-        });
+        assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue()));
 
         partition.stopDisrupting();
 

+ 2 - 12
core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java

@@ -68,12 +68,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
         ensureGreen("test");
         indexRandomData();
         internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
-            }
-        });
+        assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
         assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
         internalCluster().startNode(); // this will use the same data location as the stopped node
         ensureGreen("test");
@@ -114,12 +109,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
         ensureGreen("test");
         indexRandomData();
         internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
-            }
-        });
+        assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
         assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
         assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(100))).get());
         ensureGreen("test");

+ 29 - 41
core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java

@@ -57,12 +57,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
         List<String> nodes = internalCluster().startNodes(3);
 
         // Wait for all 3 nodes to be up
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
-                assertThat(resp.getNodes().size(), equalTo(3));
-            }
+        assertBusy(() -> {
+            NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
+            assertThat(resp.getNodes().size(), equalTo(3));
         });
 
         // Start with all nodes at 50% usage
@@ -86,13 +83,10 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
         ensureGreen("test");
 
         // Block until the "fake" cluster info is retrieved at least once
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterInfo info = cis.getClusterInfo();
-                logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
-                assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
-            }
+        assertBusy(() -> {
+            ClusterInfo info = cis.getClusterInfo();
+            logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
+            assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
         });
 
         final List<String> realNodeNames = new ArrayList<>();
@@ -113,21 +107,18 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
         // Retrieve the count of shards on each node
         final Map<String, Integer> nodesToShardCount = new HashMap<>();
 
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterStateResponse resp = client().admin().cluster().prepareState().get();
-                Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
-                while (iter.hasNext()) {
-                    RoutingNode node = iter.next();
-                    logger.info("--> node {} has {} shards",
-                            node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
-                    nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
-                }
-                assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
-                assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
-                assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
+        assertBusy(() -> {
+            ClusterStateResponse resp12 = client().admin().cluster().prepareState().get();
+            Iterator<RoutingNode> iter12 = resp12.getState().getRoutingNodes().iterator();
+            while (iter12.hasNext()) {
+                RoutingNode node = iter12.next();
+                logger.info("--> node {} has {} shards",
+                        node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
+                nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
             }
+            assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
+            assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
+            assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
         });
 
         // Update the disk usages so one node is now back under the high watermark
@@ -138,21 +129,18 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
         // Retrieve the count of shards on each node
         nodesToShardCount.clear();
 
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterStateResponse resp = client().admin().cluster().prepareState().get();
-                Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
-                while (iter.hasNext()) {
-                    RoutingNode node = iter.next();
-                    logger.info("--> node {} has {} shards",
-                            node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
-                    nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
-                }
-                assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
-                assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
-                assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
+        assertBusy(() -> {
+            ClusterStateResponse resp1 = client().admin().cluster().prepareState().get();
+            Iterator<RoutingNode> iter1 = resp1.getState().getRoutingNodes().iterator();
+            while (iter1.hasNext()) {
+                RoutingNode node = iter1.next();
+                logger.info("--> node {} has {} shards",
+                        node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
+                nodesToShardCount.put(node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
             }
+            assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
+            assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
+            assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
         });
     }
 }

+ 3 - 6
core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

@@ -229,12 +229,9 @@ public class EsExecutorsTests extends ESTestCase {
         assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
         assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
         barrier.await();
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
-                assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
-            }
+        assertBusy(() -> {
+            assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
+            assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
         });
         terminate(pool);
     }

+ 1 - 6
core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java

@@ -264,12 +264,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
 
         // the timeout handler is added post execution (and quickly cancelled). We have allow for this
         // and use assert busy
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(timer.getQueue().size(), equalTo(0));
-            }
-        }, 5, TimeUnit.SECONDS);
+        assertBusy(() -> assertThat(timer.getQueue().size(), equalTo(0)), 5, TimeUnit.SECONDS);
         assertThat(timeoutCalled.get(), equalTo(false));
         assertTrue(terminate(executor));
         assertTrue(terminate(threadPool));

+ 16 - 22
core/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java

@@ -197,35 +197,29 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
     }
 
     void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception {
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterState state = getNodeClusterState(node);
-                final DiscoveryNodes nodes = state.nodes();
-                assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
-                if (expectedBlocks != null) {
-                    for (ClusterBlockLevel level : expectedBlocks.levels()) {
-                        assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
-                                (level));
-                    }
+        assertBusy(() -> {
+            ClusterState state = getNodeClusterState(node);
+            final DiscoveryNodes nodes = state.nodes();
+            assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
+            if (expectedBlocks != null) {
+                for (ClusterBlockLevel level : expectedBlocks.levels()) {
+                    assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
+                            (level));
                 }
             }
         }, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
     }
 
     void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterState state = getNodeClusterState(node);
-                String masterNode = null;
-                if (state.nodes().getMasterNode() != null) {
-                    masterNode = state.nodes().getMasterNode().getName();
-                }
-                logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
-                assertThat("node [" + node + "] still has [" + masterNode + "] as master",
-                        oldMasterNode, not(equalTo(masterNode)));
+        assertBusy(() -> {
+            ClusterState state = getNodeClusterState(node);
+            String masterNode = null;
+            if (state.nodes().getMasterNode() != null) {
+                masterNode = state.nodes().getMasterNode().getName();
             }
+            logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
+            assertThat("node [" + node + "] still has [" + masterNode + "] as master",
+                    oldMasterNode, not(equalTo(masterNode)));
         }, 10, TimeUnit.SECONDS);
     }
 

+ 15 - 18
core/src/test/java/org/elasticsearch/document/ShardInfoIT.java

@@ -128,24 +128,21 @@ public class ShardInfoIT extends ESIntegTestCase {
     }
 
     private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception {
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterState state = client().admin().cluster().prepareState().get().getState();
-                assertThat(state.routingTable().index("idx"), not(nullValue()));
-                assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
-                assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));
-
-                ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
-                        .setWaitForNoRelocatingShards(true)
-                        .get();
-                assertThat(healthResponse.isTimedOut(), equalTo(false));
-
-                RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
-                        .setActiveOnly(true)
-                        .get();
-                assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
-            }
+        assertBusy(() -> {
+            ClusterState state = client().admin().cluster().prepareState().get().getState();
+            assertThat(state.routingTable().index("idx"), not(nullValue()));
+            assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
+            assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));
+
+            ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
+                    .setWaitForNoRelocatingShards(true)
+                    .get();
+            assertThat(healthResponse.isTimedOut(), equalTo(false));
+
+            RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
+                    .setActiveOnly(true)
+                    .get();
+            assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
         });
     }
 }

+ 2 - 2
core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
@@ -83,7 +84,6 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
@@ -458,7 +458,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
             threads[i].start();
         }
         barrier.await();
-        final Runnable check;
+        final CheckedRunnable<Exception> check;
         if (flush) {
             final FlushStats flushStats = shard.flushStats();
             final long total = flushStats.getTotal();

+ 5 - 8
core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

@@ -405,14 +405,11 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
         imc.forceCheck();
 
         // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool:
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
-                    // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
-                    final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
-                    assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
-                }
+        assertBusy(() -> {
+            try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
+                // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
+                final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
+                assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
             }
         });
     }

+ 7 - 10
core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java

@@ -381,16 +381,13 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
     /** Issues a cache clear and waits 30 seconds for the field data breaker to be cleared */
     public void clearFieldData() throws Exception {
         client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                NodesStatsResponse resp = client().admin().cluster().prepareNodesStats()
-                        .clear().setBreaker(true).get(new TimeValue(15, TimeUnit.SECONDS));
-                for (NodeStats nStats : resp.getNodes()) {
-                    assertThat("fielddata breaker never reset back to 0",
-                            nStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(),
-                            equalTo(0L));
-                }
+        assertBusy(() -> {
+            NodesStatsResponse resp = client().admin().cluster().prepareNodesStats()
+                    .clear().setBreaker(true).get(new TimeValue(15, TimeUnit.SECONDS));
+            for (NodeStats nStats : resp.getNodes()) {
+                assertThat("fielddata breaker never reset back to 0",
+                        nStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(),
+                        equalTo(0L));
             }
         }, 30, TimeUnit.SECONDS);
     }

+ 17 - 24
core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

@@ -269,17 +269,13 @@ public class IndexRecoveryIT extends ESIntegTestCase {
 
         logger.info("--> waiting for recovery to start both on source and target");
         final Index index = resolveIndex(INDEX_NAME);
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-
-                IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA);
-                assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(),
-                        equalTo(1));
-                indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
-                assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(),
-                        equalTo(1));
-            }
+        assertBusy(() -> {
+            IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA);
+            assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(),
+                    equalTo(1));
+            indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
+            assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(),
+                    equalTo(1));
         });
 
         logger.info("--> request recoveries");
@@ -318,19 +314,16 @@ public class IndexRecoveryIT extends ESIntegTestCase {
         logger.info("--> checking throttling increases");
         final long finalNodeAThrottling = nodeAThrottling;
         final long finalNodeBThrottling = nodeBThrottling;
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
-                assertThat(statsResponse.getNodes(), hasSize(2));
-                for (NodeStats nodeStats : statsResponse.getNodes()) {
-                    final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
-                    if (nodeStats.getNode().getName().equals(nodeA)) {
-                        assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling));
-                    }
-                    if (nodeStats.getNode().getName().equals(nodeB)) {
-                        assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling));
-                    }
+        assertBusy(() -> {
+            NodesStatsResponse statsResponse1 = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
+            assertThat(statsResponse1.getNodes(), hasSize(2));
+            for (NodeStats nodeStats : statsResponse1.getNodes()) {
+                final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
+                if (nodeStats.getNode().getName().equals(nodeA)) {
+                    assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling));
+                }
+                if (nodeStats.getNode().getName().equals(nodeB)) {
+                    assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling));
                 }
             }
         });

+ 1 - 6
core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java

@@ -157,12 +157,7 @@ public class RecoveryTargetTests extends ESTestCase {
         Timer lastRead = streamer.serializeDeserialize();
         final long time = lastRead.time();
         assertThat(time, lessThanOrEqualTo(timer.time()));
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time()));
-            }
-        });
+        assertBusy(() -> assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time())));
         assertThat("captured time shouldn't change", lastRead.time(), equalTo(time));
 
         if (randomBoolean()) {

+ 38 - 55
core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java

@@ -266,23 +266,20 @@ public class RareClusterStateIT extends ESIntegTestCase {
             }
         });
         // ...and wait for mappings to be available on master
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ImmutableOpenMap<String, MappingMetaData> indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index");
-                assertNotNull(indexMappings);
-                MappingMetaData typeMappings = indexMappings.get("type");
-                assertNotNull(typeMappings);
-                Object properties;
-                try {
-                    properties = typeMappings.getSourceAsMap().get("properties");
-                } catch (IOException e) {
-                    throw new AssertionError(e);
-                }
-                assertNotNull(properties);
-                Object fieldMapping = ((Map<String, Object>) properties).get("field");
-                assertNotNull(fieldMapping);
+        assertBusy(() -> {
+            ImmutableOpenMap<String, MappingMetaData> indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index");
+            assertNotNull(indexMappings);
+            MappingMetaData typeMappings = indexMappings.get("type");
+            assertNotNull(typeMappings);
+            Object properties;
+            try {
+                properties = typeMappings.getSourceAsMap().get("properties");
+            } catch (IOException e) {
+                throw new AssertionError(e);
             }
+            assertNotNull(properties);
+            Object fieldMapping = ((Map<String, Object>) properties).get("field");
+            assertNotNull(fieldMapping);
         });
 
         final AtomicReference<Object> docIndexResponse = new AtomicReference<>();
@@ -307,17 +304,14 @@ public class RareClusterStateIT extends ESIntegTestCase {
 
         // Now make sure the indexing request finishes successfully
         disruption.stopDisrupting();
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class));
-                PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get();
-                assertTrue(resp.isAcknowledged());
-                assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
-                IndexResponse docResp = (IndexResponse) docIndexResponse.get();
-                assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
-                        1, docResp.getShardInfo().getTotal());
-            }
+        assertBusy(() -> {
+            assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class));
+            PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get();
+            assertTrue(resp.isAcknowledged());
+            assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
+            IndexResponse docResp = (IndexResponse) docIndexResponse.get();
+            assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
+                    1, docResp.getShardInfo().getTotal());
         });
     }
 
@@ -387,17 +381,14 @@ public class RareClusterStateIT extends ESIntegTestCase {
         });
         final Index index = resolveIndex("index");
         // Wait for mappings to be available on master
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
-                final IndexService indexService = indicesService.indexServiceSafe(index);
-                assertNotNull(indexService);
-                final MapperService mapperService = indexService.mapperService();
-                DocumentMapper mapper = mapperService.documentMapper("type");
-                assertNotNull(mapper);
-                assertNotNull(mapper.mappers().getMapper("field"));
-            }
+        assertBusy(() -> {
+            final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
+            final IndexService indexService = indicesService.indexServiceSafe(index);
+            assertNotNull(indexService);
+            final MapperService mapperService = indexService.mapperService();
+            DocumentMapper mapper = mapperService.documentMapper("type");
+            assertNotNull(mapper);
+            assertNotNull(mapper.mappers().getMapper("field"));
         });
 
         final AtomicReference<Object> docIndexResponse = new AtomicReference<>();
@@ -414,12 +405,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
         });
 
         // Wait for document to be indexed on primary
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists());
-            }
-        });
+        assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists()));
 
         // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
         // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
@@ -430,17 +416,14 @@ public class RareClusterStateIT extends ESIntegTestCase {
 
         // Now make sure the indexing request finishes successfully
         disruption.stopDisrupting();
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class));
-                PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get();
-                assertTrue(resp.isAcknowledged());
-                assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
-                IndexResponse docResp = (IndexResponse) docIndexResponse.get();
-                assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
-                        2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
-            }
+        assertBusy(() -> {
+            assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class));
+            PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get();
+            assertTrue(resp.isAcknowledged());
+            assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
+            IndexResponse docResp = (IndexResponse) docIndexResponse.get();
+            assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
+                    2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
         });
     }
 

+ 1 - 6
core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java

@@ -269,12 +269,7 @@ public class IndexStatsIT extends ESIntegTestCase {
         }
         indexRandom(true, builders);
         refresh();
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L));
-            }
-        });
+        assertBusy(() -> assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L)));
 
         for (int i = 0; i < 10; i++) {
             assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), equalTo((long) numDocs));

+ 1 - 9
core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java

@@ -21,12 +21,10 @@ package org.elasticsearch.indices.store;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
-import org.elasticsearch.cluster.LocalClusterUpdateTask;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -40,7 +38,6 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.env.Environment;
@@ -378,12 +375,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
         // allocation filtering may not have immediate effect
         // TODO: we should add an easier to do this. It's too much of a song and dance..
         Index index = resolveIndex("test");
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex(index));
-            }
-        });
+        assertBusy(() -> assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex(index)));
 
         // wait for 4 active shards - we should have lost one shard
         assertFalse(client().admin().cluster().prepareHealth().setWaitForActiveShards(4).get().isTimedOut());

+ 3 - 6
core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java

@@ -341,12 +341,9 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
     }
 
     private void refreshAndAssert() throws Exception {
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
-                assertAllSuccessful(actionGet);
-            }
+        assertBusy(() -> {
+            RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
+            assertAllSuccessful(actionGet);
         }, 5, TimeUnit.MINUTES);
     }
 }

+ 15 - 18
core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java

@@ -30,9 +30,9 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.MergePolicyConfig;
 import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.MergePolicyConfig;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
@@ -132,25 +132,22 @@ public class ParentFieldLoadingIT extends ESIntegTestCase {
                 .get();
         assertAcked(putMappingResponse);
         Index test = resolveIndex("test");
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                ClusterState clusterState = internalCluster().clusterService().state();
-                ShardRouting shardRouting = clusterState.routingTable().index("test").shard(0).getShards().get(0);
-                String nodeName = clusterState.getNodes().get(shardRouting.currentNodeId()).getName();
-
-                boolean verified = false;
-                IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
-                IndexService indexService = indicesService.indexService(test);
-                if (indexService != null) {
-                    MapperService mapperService = indexService.mapperService();
-                    DocumentMapper documentMapper = mapperService.documentMapper("child");
-                    if (documentMapper != null) {
-                        verified = documentMapper.parentFieldMapper().fieldType().eagerGlobalOrdinals();
-                    }
+        assertBusy(() -> {
+            ClusterState clusterState = internalCluster().clusterService().state();
+            ShardRouting shardRouting = clusterState.routingTable().index("test").shard(0).getShards().get(0);
+            String nodeName = clusterState.getNodes().get(shardRouting.currentNodeId()).getName();
+
+            boolean verified = false;
+            IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
+            IndexService indexService = indicesService.indexService(test);
+            if (indexService != null) {
+                MapperService mapperService = indexService.mapperService();
+                DocumentMapper documentMapper = mapperService.documentMapper("child");
+                if (documentMapper != null) {
+                    verified = documentMapper.parentFieldMapper().fieldType().eagerGlobalOrdinals();
                 }
-                assertTrue(verified);
             }
+            assertTrue(verified);
         });
 
         // Need to add a new doc otherwise the refresh doesn't trigger a new searcher

+ 39 - 51
core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

@@ -208,19 +208,16 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
         Client client = client();
         createIndex("test-idx");
         logger.info("--> add custom persistent metadata");
-        updateClusterState(new ClusterStateUpdater() {
-            @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
-                ClusterState.Builder builder = ClusterState.builder(currentState);
-                MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
-                metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("before_snapshot_s"));
-                metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("before_snapshot_ns"));
-                metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("before_snapshot_s_gw"));
-                metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("before_snapshot_ns_gw"));
-                metadataBuilder.putCustom(SnapshotableGatewayNoApiMetadata.TYPE, new SnapshotableGatewayNoApiMetadata("before_snapshot_s_gw_noapi"));
-                builder.metaData(metadataBuilder);
-                return builder.build();
-            }
+        updateClusterState(currentState -> {
+            ClusterState.Builder builder = ClusterState.builder(currentState);
+            MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
+            metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("before_snapshot_s"));
+            metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("before_snapshot_ns"));
+            metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("before_snapshot_s_gw"));
+            metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("before_snapshot_ns_gw"));
+            metadataBuilder.putCustom(SnapshotableGatewayNoApiMetadata.TYPE, new SnapshotableGatewayNoApiMetadata("before_snapshot_s_gw_noapi"));
+            builder.metaData(metadataBuilder);
+            return builder.build();
         });
 
         logger.info("--> create repository");
@@ -235,27 +232,24 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
         assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
 
         logger.info("--> change custom persistent metadata");
-        updateClusterState(new ClusterStateUpdater() {
-            @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
-                ClusterState.Builder builder = ClusterState.builder(currentState);
-                MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
-                if (randomBoolean()) {
-                    metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("after_snapshot_s"));
-                } else {
-                    metadataBuilder.removeCustom(SnapshottableMetadata.TYPE);
-                }
-                metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("after_snapshot_ns"));
-                if (randomBoolean()) {
-                    metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("after_snapshot_s_gw"));
-                } else {
-                    metadataBuilder.removeCustom(SnapshottableGatewayMetadata.TYPE);
-                }
-                metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("after_snapshot_ns_gw"));
-                metadataBuilder.removeCustom(SnapshotableGatewayNoApiMetadata.TYPE);
-                builder.metaData(metadataBuilder);
-                return builder.build();
+        updateClusterState(currentState -> {
+            ClusterState.Builder builder = ClusterState.builder(currentState);
+            MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
+            if (randomBoolean()) {
+                metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("after_snapshot_s"));
+            } else {
+                metadataBuilder.removeCustom(SnapshottableMetadata.TYPE);
+            }
+            metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("after_snapshot_ns"));
+            if (randomBoolean()) {
+                metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("after_snapshot_s_gw"));
+            } else {
+                metadataBuilder.removeCustom(SnapshottableGatewayMetadata.TYPE);
             }
+            metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("after_snapshot_ns_gw"));
+            metadataBuilder.removeCustom(SnapshotableGatewayNoApiMetadata.TYPE);
+            builder.metaData(metadataBuilder);
+            return builder.build();
         });
 
         logger.info("--> delete repository");
@@ -510,15 +504,12 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
             client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
                     .setIndices("test-idx-all", "test-idx-none", "test-idx-some")
                     .setWaitForCompletion(false).setPartial(true).execute().actionGet();
-            assertBusy(new Runnable() {
-                @Override
-                public void run() {
-                    SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get();
-                    List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
-                    assertEquals(snapshotStatuses.size(), 1);
-                    logger.trace("current snapshot status [{}]", snapshotStatuses.get(0));
-                    assertTrue(snapshotStatuses.get(0).getState().completed());
-                }
+            assertBusy(() -> {
+                SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get();
+                List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
+                assertEquals(snapshotStatuses.size(), 1);
+                logger.trace("current snapshot status [{}]", snapshotStatuses.get(0));
+                assertTrue(snapshotStatuses.get(0).getState().completed());
             }, 1, TimeUnit.MINUTES);
             SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get();
             List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
@@ -531,15 +522,12 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
 
             // There is slight delay between snapshot being marked as completed in the cluster state and on the file system
             // After it was marked as completed in the cluster state - we need to check if it's completed on the file system as well
-            assertBusy(new Runnable() {
-                @Override
-                public void run() {
-                    GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").get();
-                    assertThat(response.getSnapshots().size(), equalTo(1));
-                    SnapshotInfo snapshotInfo = response.getSnapshots().get(0);
-                    assertTrue(snapshotInfo.state().completed());
-                    assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
-                }
+            assertBusy(() -> {
+                GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").get();
+                assertThat(response.getSnapshots().size(), equalTo(1));
+                SnapshotInfo snapshotInfo = response.getSnapshots().get(0);
+                assertTrue(snapshotInfo.state().completed());
+                assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
             }, 1, TimeUnit.MINUTES);
         } else {
             logger.info("checking snapshot completion using wait_for_completion flag");

+ 3 - 6
modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

@@ -82,12 +82,9 @@ public class Netty4ScheduledPingTests extends ESTestCase {
         serviceA.connectToNode(nodeB);
         serviceB.connectToNode(nodeA);
 
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
-                assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
-            }
+        assertBusy(() -> {
+            assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
+            assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
         });
         assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
         assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

@@ -29,7 +29,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import com.carrotsearch.randomizedtesting.generators.RandomStrings;
 import com.carrotsearch.randomizedtesting.rules.TestRuleAdapter;
-
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -48,6 +47,7 @@ import org.elasticsearch.bootstrap.BootstrapForTesting;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.common.io.PathUtilsForTesting;
@@ -692,14 +692,14 @@ public abstract class ESTestCase extends LuceneTestCase {
     /**
      * Runs the code block for 10 seconds waiting for no assertion to trip.
      */
-    public static void assertBusy(Runnable codeBlock) throws Exception {
+    public static void assertBusy(CheckedRunnable<Exception> codeBlock) throws Exception {
         assertBusy(codeBlock, 10, TimeUnit.SECONDS);
     }
 
     /**
      * Runs the code block for the provided interval, waiting for no assertions to trip.
      */
-    public static void assertBusy(Runnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception {
+    public static void assertBusy(CheckedRunnable<Exception> codeBlock, long maxWaitTime, TimeUnit unit) throws Exception {
         long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
         long iterations = Math.max(Math.round(Math.log10(maxTimeInMillis) / Math.log10(2)), 1);
         long timeInMillis = 1;

+ 3 - 6
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -2016,12 +2016,9 @@ public final class InternalTestCluster extends TestCluster {
                 // in an assertBusy loop, so it will try for 10 seconds and
                 // fail if it never reached 0
                 try {
-                    assertBusy(new Runnable() {
-                        @Override
-                        public void run() {
-                            CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
-                            assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L));
-                        }
+                    assertBusy(() -> {
+                        CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
+                        assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L));
                     });
                 } catch (Exception e) {
                     fail("Exception during check for request breaker reset to 0: " + e);