|
@@ -5,21 +5,27 @@
|
|
|
*/
|
|
|
package org.elasticsearch.xpack.searchablesnapshots;
|
|
|
|
|
|
+import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.cluster.routing.RecoverySource;
|
|
|
-import org.elasticsearch.cluster.routing.RoutingNode;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
-import org.elasticsearch.cluster.routing.UnassignedInfo;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
|
|
|
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
|
|
|
import org.elasticsearch.cluster.routing.allocation.FailedShard;
|
|
|
-import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
|
|
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
|
-import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
|
|
+import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.repositories.IndexId;
|
|
|
+import org.elasticsearch.snapshots.Snapshot;
|
|
|
+import org.elasticsearch.snapshots.SnapshotId;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
|
|
|
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
|
|
|
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
|
|
|
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
|
|
|
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
|
|
|
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
|
|
|
+
|
|
|
public class SearchableSnapshotAllocator implements ExistingShardsAllocator {
|
|
|
|
|
|
public static final String ALLOCATOR_NAME = "searchable_snapshot_allocator";
|
|
@@ -36,55 +42,51 @@ public class SearchableSnapshotAllocator implements ExistingShardsAllocator {
|
|
|
RoutingAllocation allocation,
|
|
|
UnassignedAllocationHandler unassignedAllocationHandler
|
|
|
) {
|
|
|
+ if (shardRouting.primary()
|
|
|
+ && (shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|
|
|
+ || shardRouting.recoverySource().getType() == RecoverySource.Type.EMPTY_STORE)) {
|
|
|
+ // we always force snapshot recovery source to use the snapshot-based recovery process on the node
|
|
|
+
|
|
|
+ final Settings indexSettings = allocation.metadata().index(shardRouting.index()).getSettings();
|
|
|
+ final IndexId indexId = new IndexId(
|
|
|
+ SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings),
|
|
|
+ SNAPSHOT_INDEX_ID_SETTING.get(indexSettings)
|
|
|
+ );
|
|
|
+ final SnapshotId snapshotId = new SnapshotId(
|
|
|
+ SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings),
|
|
|
+ SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)
|
|
|
+ );
|
|
|
+ final String repository = SNAPSHOT_REPOSITORY_SETTING.get(indexSettings);
|
|
|
+ final Snapshot snapshot = new Snapshot(repository, snapshotId);
|
|
|
+
|
|
|
+ shardRouting = unassignedAllocationHandler.updateUnassigned(
|
|
|
+ shardRouting.unassignedInfo(),
|
|
|
+ new RecoverySource.SnapshotRecoverySource(
|
|
|
+ RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID,
|
|
|
+ snapshot,
|
|
|
+ Version.CURRENT,
|
|
|
+ indexId
|
|
|
+ ),
|
|
|
+ allocation.changes()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
final AllocateUnassignedDecision allocateUnassignedDecision = decideAllocation(allocation, shardRouting);
|
|
|
- assert allocateUnassignedDecision.isDecisionTaken();
|
|
|
-
|
|
|
- if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
|
|
|
- if (shardRouting.primary() && shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
|
|
|
- // we don't care what the allocation ID is since we know that these shards cannot really be stale, so we can
|
|
|
- // safely ignore the allocation ID with a forced-stale allocation and allow this shard to fall through to the balanced
|
|
|
- // shards allocator
|
|
|
- unassignedAllocationHandler.updateUnassigned(
|
|
|
- shardRouting.unassignedInfo(),
|
|
|
- RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE,
|
|
|
- allocation.changes()
|
|
|
- );
|
|
|
- }
|
|
|
- } else {
|
|
|
+
|
|
|
+ if (allocateUnassignedDecision.isDecisionTaken() && allocateUnassignedDecision.getAllocationDecision() != AllocationDecision.YES) {
|
|
|
unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation, ShardRouting shardRouting) {
|
|
|
+ private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation, ShardRouting shardRouting) {
|
|
|
assert shardRouting.unassigned();
|
|
|
assert ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.get(
|
|
|
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings()
|
|
|
).equals(ALLOCATOR_NAME);
|
|
|
|
|
|
- Decision.Type bestDecision = Decision.Type.NO;
|
|
|
- RoutingNode bestNode = null;
|
|
|
- final List<NodeAllocationResult> nodeAllocationResults = allocation.debugDecision()
|
|
|
- ? new ArrayList<>(allocation.routingNodes().size())
|
|
|
- : null;
|
|
|
-
|
|
|
- for (final RoutingNode routingNode : allocation.routingNodes()) {
|
|
|
- final Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
|
|
|
- if (decision.type() == Decision.Type.YES || (decision.type() == Decision.Type.THROTTLE && bestDecision != Decision.Type.YES)) {
|
|
|
- bestDecision = decision.type();
|
|
|
- bestNode = routingNode;
|
|
|
- }
|
|
|
- if (nodeAllocationResults != null) {
|
|
|
- nodeAllocationResults.add(new NodeAllocationResult(routingNode.node(), null, decision));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (bestDecision == Decision.Type.YES) {
|
|
|
- return AllocateUnassignedDecision.yes(bestNode.node(), null, nodeAllocationResults, false);
|
|
|
- } else if (bestDecision == Decision.Type.THROTTLE) {
|
|
|
- return AllocateUnassignedDecision.throttle(nodeAllocationResults);
|
|
|
- } else {
|
|
|
- return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.DECIDERS_NO, nodeAllocationResults);
|
|
|
- }
|
|
|
+ // let BalancedShardsAllocator take care of allocating this shard
|
|
|
+ // TODO: once we have persistent cache, choose a node that has existing data
|
|
|
+ return AllocateUnassignedDecision.NOT_TAKEN;
|
|
|
}
|
|
|
|
|
|
@Override
|