|
|
@@ -16,19 +16,18 @@ import org.apache.lucene.store.Directory;
|
|
|
import org.apache.lucene.store.FSDirectory;
|
|
|
import org.apache.lucene.util.BytesRef;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
|
-import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
|
|
-import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|
|
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
|
|
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
|
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
|
|
-import org.elasticsearch.action.search.SearchResponse;
|
|
|
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
|
|
|
import org.elasticsearch.client.internal.Requests;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
|
|
-import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
@@ -86,7 +85,10 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.function.Consumer;
|
|
|
|
|
|
+import static java.util.stream.Collectors.collectingAndThen;
|
|
|
+import static java.util.stream.Collectors.toCollection;
|
|
|
import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
|
|
|
@@ -127,7 +129,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
/**
|
|
|
* Tests that we can actually recover from a corruption on the primary given that we have replica shards around.
|
|
|
*/
|
|
|
- public void testCorruptFileAndRecover() throws ExecutionException, InterruptedException, IOException {
|
|
|
+ public void testCorruptFileAndRecover() throws InterruptedException, IOException {
|
|
|
int numDocs = scaledRandomIntBetween(100, 1000);
|
|
|
// have enough space for 3 copies
|
|
|
internalCluster().ensureAtLeastNumDataNodes(3);
|
|
|
@@ -161,8 +163,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get());
|
|
|
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get());
|
|
|
// we have to flush at least once here since we don't corrupt the translog
|
|
|
- SearchResponse countResponse = client().prepareSearch().setSize(0).get();
|
|
|
- assertHitCount(countResponse, numDocs);
|
|
|
+ assertHitCount(client().prepareSearch().setSize(0).get(), numDocs);
|
|
|
|
|
|
final int numShards = numShards("test");
|
|
|
ShardRouting corruptedShardRouting = corruptRandomPrimaryFile();
|
|
|
@@ -193,8 +194,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
|
|
final int numIterations = scaledRandomIntBetween(5, 20);
|
|
|
for (int i = 0; i < numIterations; i++) {
|
|
|
- SearchResponse response = client().prepareSearch().setSize(numDocs).get();
|
|
|
- assertHitCount(response, numDocs);
|
|
|
+ assertHitCount(client().prepareSearch().setSize(numDocs).get(), numDocs);
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -279,8 +279,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get());
|
|
|
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get());
|
|
|
// we have to flush at least once here since we don't corrupt the translog
|
|
|
- SearchResponse countResponse = client().prepareSearch().setSize(0).get();
|
|
|
- assertHitCount(countResponse, numDocs);
|
|
|
+ assertHitCount(client().prepareSearch().setSize(0).get(), numDocs);
|
|
|
|
|
|
ShardRouting shardRouting = corruptRandomPrimaryFile();
|
|
|
/*
|
|
|
@@ -336,41 +335,33 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
* This simulates recoveries from old indices or even without checksums and makes sure if we fail during finalization
|
|
|
* we also check if the primary is ok. Without the relevant checks this test fails with a RED cluster
|
|
|
*/
|
|
|
- public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionException, InterruptedException, IOException {
|
|
|
+ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws InterruptedException {
|
|
|
internalCluster().ensureAtLeastNumDataNodes(2);
|
|
|
- NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
|
|
|
- List<NodeStats> dataNodeStats = new ArrayList<>();
|
|
|
- for (NodeStats stat : nodeStats.getNodes()) {
|
|
|
- if (stat.getNode().canContainData()) {
|
|
|
- dataNodeStats.add(stat);
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2));
|
|
|
- Collections.shuffle(dataNodeStats, random());
|
|
|
- NodeStats primariesNode = dataNodeStats.get(0);
|
|
|
- NodeStats unluckyNode = dataNodeStats.get(1);
|
|
|
+ var dataNodes = getShuffledDataNodes();
|
|
|
+
|
|
|
+ var primariesNode = dataNodes.get(0);
|
|
|
+ var unluckyNode = dataNodes.get(1);
|
|
|
assertAcked(
|
|
|
prepareCreate("test").setSettings(
|
|
|
Settings.builder()
|
|
|
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
|
|
|
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
- .put("index.routing.allocation.include._name", primariesNode.getNode().getName())
|
|
|
+ .put("index.routing.allocation.include._name", primariesNode.getName())
|
|
|
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
|
|
|
.put("index.allocation.max_retries", Integer.MAX_VALUE) // keep on retrying
|
|
|
-
|
|
|
)
|
|
|
);
|
|
|
ensureGreen(); // allocated with empty commit
|
|
|
final AtomicBoolean corrupt = new AtomicBoolean(true);
|
|
|
final CountDownLatch hasCorrupted = new CountDownLatch(1);
|
|
|
- for (NodeStats dataNode : dataNodeStats) {
|
|
|
+ for (var dataNode : dataNodes) {
|
|
|
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
|
|
|
TransportService.class,
|
|
|
- dataNode.getNode().getName()
|
|
|
+ dataNode.getName()
|
|
|
));
|
|
|
mockTransportService.addSendBehavior(
|
|
|
- internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
|
|
|
+ internalCluster().getInstance(TransportService.class, unluckyNode.getName()),
|
|
|
(connection, requestId, action, request, options) -> {
|
|
|
if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
|
|
|
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
|
|
|
@@ -386,7 +377,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
|
|
|
Settings build = Settings.builder()
|
|
|
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
|
|
|
- .put("index.routing.allocation.include._name", primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName())
|
|
|
+ .put("index.routing.allocation.include._name", primariesNode.getName() + "," + unluckyNode.getName())
|
|
|
.build();
|
|
|
client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
|
|
|
client().admin().cluster().prepareReroute().get();
|
|
|
@@ -399,24 +390,13 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
* Tests corruption that happens on the network layer and that the primary does not get affected by corruption that happens on the way
|
|
|
* to the replica. The file on disk stays uncorrupted
|
|
|
*/
|
|
|
- public void testCorruptionOnNetworkLayer() throws ExecutionException, InterruptedException {
|
|
|
+ public void testCorruptionOnNetworkLayer() throws InterruptedException {
|
|
|
int numDocs = scaledRandomIntBetween(100, 1000);
|
|
|
- internalCluster().ensureAtLeastNumDataNodes(2);
|
|
|
- if (cluster().numDataNodes() < 3) {
|
|
|
- internalCluster().startDataOnlyNode();
|
|
|
- }
|
|
|
- NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
|
|
|
- List<NodeStats> dataNodeStats = new ArrayList<>();
|
|
|
- for (NodeStats stat : nodeStats.getNodes()) {
|
|
|
- if (stat.getNode().canContainData()) {
|
|
|
- dataNodeStats.add(stat);
|
|
|
- }
|
|
|
- }
|
|
|
+ internalCluster().ensureAtLeastNumDataNodes(3);
|
|
|
|
|
|
- assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2));
|
|
|
- Collections.shuffle(dataNodeStats, random());
|
|
|
- NodeStats primariesNode = dataNodeStats.get(0);
|
|
|
- NodeStats unluckyNode = dataNodeStats.get(1);
|
|
|
+ var dataNodes = getShuffledDataNodes();
|
|
|
+ var primariesNode = dataNodes.get(0);
|
|
|
+ var unluckyNode = dataNodes.get(1);
|
|
|
|
|
|
assertAcked(
|
|
|
prepareCreate("test").setSettings(
|
|
|
@@ -425,7 +405,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast
|
|
|
// This does corrupt files on the replica, so we can't check:
|
|
|
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
|
|
|
- .put("index.routing.allocation.include._name", primariesNode.getNode().getName())
|
|
|
+ .put("index.routing.allocation.include._name", primariesNode.getName())
|
|
|
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
|
|
|
)
|
|
|
);
|
|
|
@@ -438,83 +418,86 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
ensureGreen();
|
|
|
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
|
|
|
// we have to flush at least once here since we don't corrupt the translog
|
|
|
- SearchResponse countResponse = client().prepareSearch().setSize(0).get();
|
|
|
- assertHitCount(countResponse, numDocs);
|
|
|
- final boolean truncate = randomBoolean();
|
|
|
- for (NodeStats dataNode : dataNodeStats) {
|
|
|
- MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
|
|
|
- TransportService.class,
|
|
|
- dataNode.getNode().getName()
|
|
|
- ));
|
|
|
- mockTransportService.addSendBehavior(
|
|
|
- internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
|
|
|
- (connection, requestId, action, request, options) -> {
|
|
|
- if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
|
|
|
- RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
|
|
|
- if (truncate && req.length() > 1) {
|
|
|
- BytesRef bytesRef = req.content().toBytesRef();
|
|
|
- BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1);
|
|
|
- request = new RecoveryFileChunkRequest(
|
|
|
- req.recoveryId(),
|
|
|
- req.requestSeqNo(),
|
|
|
- req.shardId(),
|
|
|
- req.metadata(),
|
|
|
- req.position(),
|
|
|
- ReleasableBytesReference.wrap(array),
|
|
|
- req.lastChunk(),
|
|
|
- req.totalTranslogOps(),
|
|
|
- req.sourceThrottleTimeInNanos()
|
|
|
- );
|
|
|
- } else {
|
|
|
- assert req.content().toBytesRef().bytes == req.content().toBytesRef().bytes : "no internal reference!!";
|
|
|
- final byte[] array = req.content().toBytesRef().bytes;
|
|
|
- int i = randomIntBetween(0, req.content().length() - 1);
|
|
|
- array[i] = (byte) ~array[i]; // flip one byte in the content
|
|
|
- }
|
|
|
- }
|
|
|
- connection.sendRequest(requestId, action, request, options);
|
|
|
- }
|
|
|
- );
|
|
|
- }
|
|
|
+ assertHitCount(client().prepareSearch().setSize(0).get(), numDocs);
|
|
|
|
|
|
- Settings build = Settings.builder()
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
|
|
|
- .put("index.routing.allocation.include._name", "*")
|
|
|
- .build();
|
|
|
- client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
|
|
|
- client().admin().cluster().prepareReroute().get();
|
|
|
- ClusterHealthResponse actionGet = client().admin()
|
|
|
- .cluster()
|
|
|
- .health(Requests.clusterHealthRequest("test").waitForGreenStatus())
|
|
|
- .actionGet();
|
|
|
- if (actionGet.isTimedOut()) {
|
|
|
- logger.info(
|
|
|
- "ensureGreen timed out, cluster state:\n{}\n{}",
|
|
|
- client().admin().cluster().prepareState().get().getState(),
|
|
|
- client().admin().cluster().preparePendingClusterTasks().get()
|
|
|
- );
|
|
|
- assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
|
|
|
- }
|
|
|
- // we are green so primaries got not corrupted.
|
|
|
- // ensure that no shard is actually allocated on the unlucky node
|
|
|
- ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
|
|
|
- final IndexRoutingTable indexRoutingTable = clusterStateResponse.getState().getRoutingTable().index("test");
|
|
|
- for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
|
|
|
- final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
|
|
|
- for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
|
|
|
- final ShardRouting routing = indexShardRoutingTable.shard(copy);
|
|
|
- if (unluckyNode.getNode().getId().equals(routing.currentNodeId())) {
|
|
|
- assertThat(routing.state(), not(equalTo(ShardRoutingState.STARTED)));
|
|
|
- assertThat(routing.state(), not(equalTo(ShardRoutingState.RELOCATING)));
|
|
|
+ var source = (MockTransportService) internalCluster().getInstance(TransportService.class, primariesNode.getName());
|
|
|
+ var target = internalCluster().getInstance(TransportService.class, unluckyNode.getName());
|
|
|
+
|
|
|
+ final boolean truncate = randomBoolean();
|
|
|
+ source.addSendBehavior(target, (connection, requestId, action, request, options) -> {
|
|
|
+ if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
|
|
|
+ RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
|
|
|
+ if (truncate && req.length() > 1) {
|
|
|
+ BytesRef bytesRef = req.content().toBytesRef();
|
|
|
+ BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1);
|
|
|
+ request = new RecoveryFileChunkRequest(
|
|
|
+ req.recoveryId(),
|
|
|
+ req.requestSeqNo(),
|
|
|
+ req.shardId(),
|
|
|
+ req.metadata(),
|
|
|
+ req.position(),
|
|
|
+ ReleasableBytesReference.wrap(array),
|
|
|
+ req.lastChunk(),
|
|
|
+ req.totalTranslogOps(),
|
|
|
+ req.sourceThrottleTimeInNanos()
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ assert req.content().toBytesRef().bytes == req.content().toBytesRef().bytes : "no internal reference!!";
|
|
|
+ final byte[] array = req.content().toBytesRef().bytes;
|
|
|
+ int i = randomIntBetween(0, req.content().length() - 1);
|
|
|
+ array[i] = (byte) ~array[i]; // flip one byte in the content
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ connection.sendRequest(requestId, action, request, options);
|
|
|
+ });
|
|
|
+
|
|
|
+ // can not allocate on unluckyNode
|
|
|
+ client().admin()
|
|
|
+ .indices()
|
|
|
+ .prepareUpdateSettings("test")
|
|
|
+ .setSettings(
|
|
|
+ Settings.builder()
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
|
|
|
+ .put("index.routing.allocation.include._name", primariesNode.getName() + "," + unluckyNode.getName())
|
|
|
+ )
|
|
|
+ .get();
|
|
|
+ ensureYellowAndNoInitializingShards("test");
|
|
|
+ assertThatAllShards("test", shard -> {
|
|
|
+ assertThat(shard.primaryShard().currentNodeId(), equalTo(primariesNode.getId()));
|
|
|
+ assertThat(shard.replicaShards().get(0).state(), not(equalTo(ShardRoutingState.STARTED)));
|
|
|
+ });
|
|
|
+
|
|
|
+ // can allocate on any other data node
|
|
|
+ client().admin()
|
|
|
+ .indices()
|
|
|
+ .prepareUpdateSettings("test")
|
|
|
+ .setSettings(
|
|
|
+ Settings.builder()
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
|
|
|
+ .putNull("index.routing.allocation.include._name")
|
|
|
+ .put("index.routing.allocation.exclude._name", unluckyNode.getName())
|
|
|
+ )
|
|
|
+ .get();
|
|
|
+ client().admin().cluster().prepareReroute().setRetryFailed(true).get();
|
|
|
+ ensureGreen("test");
|
|
|
+ assertThatAllShards("test", shard -> {
|
|
|
+ assertThat(shard.primaryShard().currentNodeId(), not(equalTo(unluckyNode.getId())));
|
|
|
+ assertThat(shard.replicaShards().get(0).state(), equalTo(ShardRoutingState.STARTED));
|
|
|
+ assertThat(shard.replicaShards().get(0).currentNodeId(), not(equalTo(unluckyNode.getId())));
|
|
|
+ });
|
|
|
+
|
|
|
final int numIterations = scaledRandomIntBetween(5, 20);
|
|
|
for (int i = 0; i < numIterations; i++) {
|
|
|
- SearchResponse response = client().prepareSearch().setSize(numDocs).get();
|
|
|
- assertHitCount(response, numDocs);
|
|
|
+ assertHitCount(client().prepareSearch().setSize(numDocs).get(), numDocs);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
+ private void assertThatAllShards(String index, Consumer<IndexShardRoutingTable> verifier) {
|
|
|
+ var clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest().routingTable(true)).actionGet();
|
|
|
+ var indexRoutingTable = clusterStateResponse.getState().getRoutingTable().index(index);
|
|
|
+ for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
|
|
|
+ verifier.accept(indexRoutingTable.shard(shardId));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -522,7 +505,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
* TODO once checksum verification on snapshotting is implemented this test needs to be fixed or split into several
|
|
|
* parts... We should also corrupt files on the actual snapshot and check that we don't restore the corrupted shard.
|
|
|
*/
|
|
|
- public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException {
|
|
|
+ public void testCorruptFileThenSnapshotAndRestore() throws InterruptedException, IOException {
|
|
|
int numDocs = scaledRandomIntBetween(100, 1000);
|
|
|
internalCluster().ensureAtLeastNumDataNodes(2);
|
|
|
|
|
|
@@ -546,8 +529,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
ensureGreen();
|
|
|
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
|
|
|
// we have to flush at least once here since we don't corrupt the translog
|
|
|
- SearchResponse countResponse = client().prepareSearch().setSize(0).get();
|
|
|
- assertHitCount(countResponse, numDocs);
|
|
|
+ assertHitCount(client().prepareSearch().setSize(0).get(), numDocs);
|
|
|
|
|
|
ShardRouting shardRouting = corruptRandomPrimaryFile(false);
|
|
|
logger.info("--> shard {} has a corrupted file", shardRouting);
|
|
|
@@ -618,8 +600,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
ensureGreen();
|
|
|
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
|
|
|
// we have to flush at least once here since we don't corrupt the translog
|
|
|
- SearchResponse countResponse = client().prepareSearch().setSize(0).get();
|
|
|
- assertHitCount(countResponse, numDocs);
|
|
|
+ assertHitCount(client().prepareSearch().setSize(0).get(), numDocs);
|
|
|
|
|
|
// disable allocations of replicas post restart (the restart will change replicas to primaries, so we have
|
|
|
// to capture replicas post restart)
|
|
|
@@ -770,4 +751,16 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|
|
}
|
|
|
return files;
|
|
|
}
|
|
|
+
|
|
|
+ private List<DiscoveryNode> getShuffledDataNodes() {
|
|
|
+ var response = client().admin().cluster().prepareNodesStats().get();
|
|
|
+ return response.getNodes()
|
|
|
+ .stream()
|
|
|
+ .map(BaseNodeResponse::getNode)
|
|
|
+ .filter(DiscoveryNode::canContainData)
|
|
|
+ .collect(collectingAndThen(toCollection(ArrayList::new), list -> {
|
|
|
+ Collections.shuffle(list, random());
|
|
|
+ return list;
|
|
|
+ }));
|
|
|
+ }
|
|
|
}
|