|
@@ -25,6 +25,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
import org.elasticsearch.action.ActionListener;
|
|
import org.elasticsearch.action.ActionListener;
|
|
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
|
|
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
|
|
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
|
|
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
|
|
|
|
+import org.elasticsearch.client.node.NodeClient;
|
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
import org.elasticsearch.cluster.routing.RerouteService;
|
|
import org.elasticsearch.cluster.routing.RerouteService;
|
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
@@ -34,8 +36,10 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
import org.elasticsearch.common.inject.Inject;
|
|
import org.elasticsearch.common.inject.Inject;
|
|
import org.elasticsearch.common.lease.Releasables;
|
|
import org.elasticsearch.common.lease.Releasables;
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
|
+import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
|
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
|
|
|
+import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
|
|
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
@@ -49,18 +53,16 @@ public class GatewayAllocator {
|
|
private final PrimaryShardAllocator primaryShardAllocator;
|
|
private final PrimaryShardAllocator primaryShardAllocator;
|
|
private final ReplicaShardAllocator replicaShardAllocator;
|
|
private final ReplicaShardAllocator replicaShardAllocator;
|
|
|
|
|
|
- private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>>
|
|
|
|
|
|
+ private final ConcurrentMap<ShardId, AsyncShardFetch<NodeGatewayStartedShards>>
|
|
asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
|
|
asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
|
|
- private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>>
|
|
|
|
|
|
+ private final ConcurrentMap<ShardId, AsyncShardFetch<NodeStoreFilesMetaData>>
|
|
asyncFetchStore = ConcurrentCollections.newConcurrentMap();
|
|
asyncFetchStore = ConcurrentCollections.newConcurrentMap();
|
|
|
|
|
|
@Inject
|
|
@Inject
|
|
- public GatewayAllocator(RerouteService rerouteService,
|
|
|
|
- TransportNodesListGatewayStartedShards startedAction,
|
|
|
|
- TransportNodesListShardStoreMetaData storeAction) {
|
|
|
|
|
|
+ public GatewayAllocator(RerouteService rerouteService, NodeClient client) {
|
|
this.rerouteService = rerouteService;
|
|
this.rerouteService = rerouteService;
|
|
- this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
|
|
|
|
- this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
|
|
|
|
|
|
+ this.primaryShardAllocator = new InternalPrimaryShardAllocator(client);
|
|
|
|
+ this.replicaShardAllocator = new InternalReplicaShardAllocator(client);
|
|
}
|
|
}
|
|
|
|
|
|
public void cleanCaches() {
|
|
public void cleanCaches() {
|
|
@@ -79,10 +81,10 @@ public class GatewayAllocator {
|
|
|
|
|
|
public int getNumberOfInFlightFetch() {
|
|
public int getNumberOfInFlightFetch() {
|
|
int count = 0;
|
|
int count = 0;
|
|
- for (AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch : asyncFetchStarted.values()) {
|
|
|
|
|
|
+ for (AsyncShardFetch<NodeGatewayStartedShards> fetch : asyncFetchStarted.values()) {
|
|
count += fetch.getNumberOfInFlightFetches();
|
|
count += fetch.getNumberOfInFlightFetches();
|
|
}
|
|
}
|
|
- for (AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch : asyncFetchStore.values()) {
|
|
|
|
|
|
+ for (AsyncShardFetch<NodeStoreFilesMetaData> fetch : asyncFetchStore.values()) {
|
|
count += fetch.getNumberOfInFlightFetches();
|
|
count += fetch.getNumberOfInFlightFetches();
|
|
}
|
|
}
|
|
return count;
|
|
return count;
|
|
@@ -147,19 +149,18 @@ public class GatewayAllocator {
|
|
|
|
|
|
class InternalPrimaryShardAllocator extends PrimaryShardAllocator {
|
|
class InternalPrimaryShardAllocator extends PrimaryShardAllocator {
|
|
|
|
|
|
- private final TransportNodesListGatewayStartedShards startedAction;
|
|
|
|
|
|
+ private final NodeClient client;
|
|
|
|
|
|
- InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction) {
|
|
|
|
- this.startedAction = startedAction;
|
|
|
|
|
|
+ InternalPrimaryShardAllocator(NodeClient client) {
|
|
|
|
+ this.client = client;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>
|
|
|
|
- fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
|
|
|
- AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch =
|
|
|
|
|
|
+ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
|
|
|
+ AsyncShardFetch<NodeGatewayStartedShards> fetch =
|
|
asyncFetchStarted.computeIfAbsent(shard.shardId(),
|
|
asyncFetchStarted.computeIfAbsent(shard.shardId(),
|
|
- shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, startedAction));
|
|
|
|
- AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState =
|
|
|
|
|
|
+ shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, this::listStartedShards));
|
|
|
|
+ AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState =
|
|
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
|
|
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
|
|
|
|
|
|
if (shardState.hasData()) {
|
|
if (shardState.hasData()) {
|
|
@@ -167,23 +168,30 @@ public class GatewayAllocator {
|
|
}
|
|
}
|
|
return shardState;
|
|
return shardState;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void listStartedShards(ShardId shardId, DiscoveryNode[] nodes,
|
|
|
|
+ ActionListener<BaseNodesResponse<NodeGatewayStartedShards>> listener) {
|
|
|
|
+ var request = new TransportNodesListGatewayStartedShards.Request(shardId, nodes);
|
|
|
|
+ client.executeLocally(TransportNodesListGatewayStartedShards.TYPE, request,
|
|
|
|
+ ActionListener.wrap(listener::onResponse, listener::onFailure));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
class InternalReplicaShardAllocator extends ReplicaShardAllocator {
|
|
class InternalReplicaShardAllocator extends ReplicaShardAllocator {
|
|
|
|
|
|
- private final TransportNodesListShardStoreMetaData storeAction;
|
|
|
|
|
|
+ private final NodeClient client;
|
|
|
|
|
|
- InternalReplicaShardAllocator(TransportNodesListShardStoreMetaData storeAction) {
|
|
|
|
- this.storeAction = storeAction;
|
|
|
|
|
|
+ InternalReplicaShardAllocator(NodeClient client) {
|
|
|
|
+ this.client = client;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>
|
|
|
|
|
|
+ protected AsyncShardFetch.FetchResult<NodeStoreFilesMetaData>
|
|
fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
|
fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
|
- AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch =
|
|
|
|
|
|
+ AsyncShardFetch<NodeStoreFilesMetaData> fetch =
|
|
asyncFetchStore.computeIfAbsent(shard.shardId(),
|
|
asyncFetchStore.computeIfAbsent(shard.shardId(),
|
|
- shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction));
|
|
|
|
- AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores =
|
|
|
|
|
|
+ shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), this::listStoreFilesMetaData));
|
|
|
|
+ AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> shardStores =
|
|
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
|
|
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
|
|
if (shardStores.hasData()) {
|
|
if (shardStores.hasData()) {
|
|
shardStores.processAllocation(allocation);
|
|
shardStores.processAllocation(allocation);
|
|
@@ -191,6 +199,13 @@ public class GatewayAllocator {
|
|
return shardStores;
|
|
return shardStores;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void listStoreFilesMetaData(ShardId shardId, DiscoveryNode[] nodes,
|
|
|
|
+ ActionListener<BaseNodesResponse<NodeStoreFilesMetaData>> listener) {
|
|
|
|
+ var request = new TransportNodesListShardStoreMetaData.Request(shardId, nodes);
|
|
|
|
+ client.executeLocally(TransportNodesListShardStoreMetaData.TYPE, request,
|
|
|
|
+ ActionListener.wrap(listener::onResponse, listener::onFailure));
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
protected boolean hasInitiatedFetching(ShardRouting shard) {
|
|
protected boolean hasInitiatedFetching(ShardRouting shard) {
|
|
return asyncFetchStore.get(shard.shardId()) != null;
|
|
return asyncFetchStore.get(shard.shardId()) != null;
|