|
@@ -10,6 +10,7 @@ package org.elasticsearch.system_indices.task;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
+import org.elasticsearch.ResourceAlreadyExistsException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
|
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
|
|
@@ -38,6 +39,7 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.core.FixForMultiProject;
|
|
|
import org.elasticsearch.core.SuppressForbidden;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
+import org.elasticsearch.core.Tuple;
|
|
|
import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.index.IndexNotFoundException;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
@@ -47,16 +49,25 @@ import org.elasticsearch.indices.SystemIndices;
|
|
|
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
|
|
import org.elasticsearch.script.Script;
|
|
|
import org.elasticsearch.tasks.TaskId;
|
|
|
-
|
|
|
-import java.util.LinkedList;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;
|
|
|
+import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
|
|
|
+import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction;
|
|
|
+import org.elasticsearch.xpack.migrate.task.ReindexDataStreamEnrichedStatus;
|
|
|
+
|
|
|
+import java.util.ArrayDeque;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Queue;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.function.BiConsumer;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
|
|
|
import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE;
|
|
@@ -77,12 +88,13 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
private final ClusterService clusterService;
|
|
|
private final SystemIndices systemIndices;
|
|
|
private final IndexScopedSettings indexScopedSettings;
|
|
|
+ private final ThreadPool threadPool;
|
|
|
|
|
|
// In-memory state
|
|
|
// NOTE: This queue is not a thread-safe class. Use `synchronized (migrationQueue)` whenever you access this. I chose this rather than
|
|
|
// a synchronized/concurrent collection or an AtomicReference because we often need to do compound operations, which are much simpler
|
|
|
// with `synchronized` blocks than when only the collection accesses are protected.
|
|
|
- private final Queue<SystemIndexMigrationInfo> migrationQueue = new LinkedList<>();
|
|
|
+ private final Queue<SystemResourceMigrationInfo> migrationQueue = new ArrayDeque<>();
|
|
|
private final AtomicReference<Map<String, Object>> currentFeatureCallbackMetadata = new AtomicReference<>();
|
|
|
|
|
|
public SystemIndexMigrator(
|
|
@@ -94,17 +106,20 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
Map<String, String> headers,
|
|
|
ClusterService clusterService,
|
|
|
SystemIndices systemIndices,
|
|
|
- IndexScopedSettings indexScopedSettings
|
|
|
+ IndexScopedSettings indexScopedSettings,
|
|
|
+ ThreadPool threadPool
|
|
|
) {
|
|
|
super(id, type, action, "system-index-migrator", parentTask, headers);
|
|
|
this.baseClient = new ParentTaskAssigningClient(client, parentTask);
|
|
|
this.clusterService = clusterService;
|
|
|
this.systemIndices = systemIndices;
|
|
|
this.indexScopedSettings = indexScopedSettings;
|
|
|
+ this.threadPool = threadPool;
|
|
|
}
|
|
|
|
|
|
public void run(SystemIndexMigrationTaskState taskState) {
|
|
|
ClusterState clusterState = clusterService.state();
|
|
|
+ ProjectMetadata projectMetadata = clusterState.metadata().getProject();
|
|
|
|
|
|
final String stateIndexName;
|
|
|
final String stateFeatureName;
|
|
@@ -124,7 +139,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (stateIndexName != null && clusterState.metadata().getProject().hasIndex(stateIndexName) == false) {
|
|
|
+ if (stateIndexName != null && projectMetadata.hasIndexAbstraction(stateIndexName) == false) {
|
|
|
markAsFailed(new IndexNotFoundException(stateIndexName, "cannot migrate because that index does not exist"));
|
|
|
return;
|
|
|
}
|
|
@@ -142,14 +157,14 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
|
|
|
systemIndices.getFeatures()
|
|
|
.stream()
|
|
|
- .flatMap(feature -> SystemIndexMigrationInfo.fromFeature(feature, clusterState.metadata(), indexScopedSettings))
|
|
|
- .filter(migrationInfo -> needsToBeMigrated(clusterState.metadata().getProject().index(migrationInfo.getCurrentIndexName())))
|
|
|
+ .flatMap(feature -> SystemResourceMigrationFactory.fromFeature(feature, projectMetadata, indexScopedSettings))
|
|
|
+ .filter(migrationInfo -> needToBeMigrated(migrationInfo.getIndices(projectMetadata)))
|
|
|
.sorted() // Stable order between nodes
|
|
|
.collect(Collectors.toCollection(() -> migrationQueue));
|
|
|
|
|
|
List<String> closedIndices = migrationQueue.stream()
|
|
|
- .filter(SystemIndexMigrationInfo::isCurrentIndexClosed)
|
|
|
- .map(SystemIndexMigrationInfo::getCurrentIndexName)
|
|
|
+ .filter(SystemResourceMigrationInfo::isCurrentIndexClosed)
|
|
|
+ .map(SystemResourceMigrationInfo::getCurrentResourceName)
|
|
|
.toList();
|
|
|
if (closedIndices.isEmpty() == false) {
|
|
|
markAsFailed(
|
|
@@ -161,27 +176,27 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
// The queue we just generated *should* be the same one as was generated on the last node, so the first entry in the queue
|
|
|
// should be the same as is in the task state
|
|
|
if (stateIndexName != null && stateFeatureName != null && migrationQueue.isEmpty() == false) {
|
|
|
- SystemIndexMigrationInfo nextMigrationInfo = migrationQueue.peek();
|
|
|
+ SystemResourceMigrationInfo nextMigrationInfo = migrationQueue.peek();
|
|
|
// This should never, ever happen in testing mode, but could conceivably happen if there are different sets of plugins
|
|
|
// installed on the previous node vs. this one.
|
|
|
assert nextMigrationInfo.getFeatureName().equals(stateFeatureName)
|
|
|
- && nextMigrationInfo.getCurrentIndexName().equals(stateIndexName)
|
|
|
- : "index name ["
|
|
|
+ && nextMigrationInfo.getCurrentResourceName().equals(stateIndexName)
|
|
|
+ : "system index/data stream name ["
|
|
|
+ stateIndexName
|
|
|
+ "] or feature name ["
|
|
|
+ stateFeatureName
|
|
|
- + "] from task state did not match first index ["
|
|
|
- + nextMigrationInfo.getCurrentIndexName()
|
|
|
+ + "] from task state did not match first index/data stream ["
|
|
|
+ + nextMigrationInfo.getCurrentResourceName()
|
|
|
+ "] and feature ["
|
|
|
+ nextMigrationInfo.getFeatureName()
|
|
|
+ "] of locally computed queue, see logs";
|
|
|
- if (nextMigrationInfo.getCurrentIndexName().equals(stateIndexName) == false) {
|
|
|
- if (clusterState.metadata().getProject().hasIndex(stateIndexName) == false) {
|
|
|
+ if (nextMigrationInfo.getCurrentResourceName().equals(stateIndexName) == false) {
|
|
|
+ if (projectMetadata.hasIndexAbstraction(stateIndexName) == false) {
|
|
|
// If we don't have that index at all, and also don't have the next one
|
|
|
markAsFailed(
|
|
|
new IllegalStateException(
|
|
|
format(
|
|
|
- "failed to resume system index migration from index [%s], that index is not present in the cluster",
|
|
|
+ "failed to resume system resource migration from resource [%s], that is not present in the cluster",
|
|
|
stateIndexName
|
|
|
)
|
|
|
)
|
|
@@ -189,8 +204,9 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}
|
|
|
logger.warn(
|
|
|
() -> format(
|
|
|
- "resuming system index migration with index [%s], which does not match index given in last task state [%s]",
|
|
|
- nextMigrationInfo.getCurrentIndexName(),
|
|
|
+ "resuming system resource migration with resource [%s],"
|
|
|
+ + " which does not match resource given in last task state [%s]",
|
|
|
+ nextMigrationInfo.getCurrentResourceName(),
|
|
|
stateIndexName
|
|
|
)
|
|
|
);
|
|
@@ -198,35 +214,43 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
|
|
|
+ // Kick off our callback "loop" - finishIndexAndLoop calls back into startFeatureMigration
|
|
|
logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState));
|
|
|
- clearResults(
|
|
|
- clusterService,
|
|
|
- ActionListener.wrap(
|
|
|
- state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName),
|
|
|
- this::markAsFailed
|
|
|
- )
|
|
|
- );
|
|
|
+ clearResults(clusterService, ActionListener.wrap(state -> startFeatureMigration(stateFeatureName), this::markAsFailed));
|
|
|
}
|
|
|
|
|
|
- private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
|
|
|
+ private void finishIndexAndLoop(SystemIndexMigrationInfo migrationInfo, BulkByScrollResponse bulkResponse) {
|
|
|
// The BulkByScroll response is validated in #migrateSingleIndex, it's just here to satisfy the ActionListener type
|
|
|
assert bulkResponse.isTimedOut() == false
|
|
|
&& (bulkResponse.getBulkFailures() == null || bulkResponse.getBulkFailures().isEmpty())
|
|
|
&& (bulkResponse.getSearchFailures() == null || bulkResponse.getSearchFailures().isEmpty())
|
|
|
: "If this assertion gets triggered it means the validation in migrateSingleIndex isn't working right";
|
|
|
- SystemIndexMigrationInfo lastMigrationInfo = currentMigrationInfo();
|
|
|
logger.info(
|
|
|
"finished migrating old index [{}] from feature [{}] to new index [{}]",
|
|
|
- lastMigrationInfo.getCurrentIndexName(),
|
|
|
- lastMigrationInfo.getFeatureName(),
|
|
|
- lastMigrationInfo.getNextIndexName()
|
|
|
+ migrationInfo.getCurrentIndexName(),
|
|
|
+ migrationInfo.getFeatureName(),
|
|
|
+ migrationInfo.getNextIndexName()
|
|
|
);
|
|
|
+
|
|
|
+ finishResourceAndLoop(migrationInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void finishDataStreamAndLoop(SystemDataStreamMigrationInfo migrationInfo) {
|
|
|
+ logger.info(
|
|
|
+ "finished migrating old indices from data stream [{}] from feature [{}] to new indices",
|
|
|
+ migrationInfo.getCurrentResourceName(),
|
|
|
+ migrationInfo.getFeatureName()
|
|
|
+ );
|
|
|
+
|
|
|
+ finishResourceAndLoop(migrationInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void finishResourceAndLoop(SystemResourceMigrationInfo lastMigrationInfo) {
|
|
|
assert migrationQueue != null && migrationQueue.isEmpty() == false;
|
|
|
synchronized (migrationQueue) {
|
|
|
migrationQueue.remove();
|
|
|
}
|
|
|
- SystemIndexMigrationInfo nextMigrationInfo = currentMigrationInfo();
|
|
|
+ SystemResourceMigrationInfo nextMigrationInfo = currentMigrationInfo();
|
|
|
if (nextMigrationInfo == null || nextMigrationInfo.getFeatureName().equals(lastMigrationInfo.getFeatureName()) == false) {
|
|
|
// The next feature name is different than the last one, so we just finished a feature - time to invoke its post-migration hook
|
|
|
lastMigrationInfo.indicesMigrationComplete(
|
|
@@ -237,7 +261,8 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
if (successful == false) {
|
|
|
// GWB> Should we actually fail in this case instead of plugging along?
|
|
|
logger.warn(
|
|
|
- "post-migration hook for feature [{}] indicated failure; feature migration metadata prior to failure was [{}]",
|
|
|
+ "post-migration hook for feature [{}] indicated failure;"
|
|
|
+ + " feature migration metadata prior to failure was [{}]",
|
|
|
lastMigrationInfo.getFeatureName(),
|
|
|
currentFeatureCallbackMetadata.get()
|
|
|
);
|
|
@@ -246,25 +271,43 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}, this::markAsFailed)
|
|
|
);
|
|
|
} else {
|
|
|
- prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName());
|
|
|
+ startFeatureMigration(lastMigrationInfo.getFeatureName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void migrateResource(SystemResourceMigrationInfo migrationInfo, ClusterState clusterState) {
|
|
|
+ if (migrationInfo instanceof SystemIndexMigrationInfo systemIndexMigrationInfo) {
|
|
|
+ logger.info(
|
|
|
+ "preparing to migrate old index [{}] from feature [{}] to new index [{}]",
|
|
|
+ systemIndexMigrationInfo.getCurrentIndexName(),
|
|
|
+ migrationInfo.getFeatureName(),
|
|
|
+ systemIndexMigrationInfo.getNextIndexName()
|
|
|
+ );
|
|
|
+ migrateSingleIndex(systemIndexMigrationInfo, clusterState, this::finishIndexAndLoop);
|
|
|
+ } else if (migrationInfo instanceof SystemDataStreamMigrationInfo systemDataStreamMigrationInfo) {
|
|
|
+ logger.info(
|
|
|
+ "preparing to migrate old indices from data stream [{}] from feature [{}] to new indices",
|
|
|
+ systemDataStreamMigrationInfo.getCurrentResourceName(),
|
|
|
+ migrationInfo.getFeatureName()
|
|
|
+ );
|
|
|
+ migrateDataStream(systemDataStreamMigrationInfo, this::finishDataStreamAndLoop);
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException("Unknown type of migration: " + migrationInfo.getClass());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationInfo) {
|
|
|
+ private void recordIndexMigrationSuccess(SystemResourceMigrationInfo lastMigrationInfo) {
|
|
|
MigrationResultsUpdateTask updateTask = MigrationResultsUpdateTask.upsert(
|
|
|
lastMigrationInfo.getFeatureName(),
|
|
|
SingleFeatureMigrationResult.success(),
|
|
|
ActionListener.wrap(state -> {
|
|
|
- prepareNextIndex(
|
|
|
- clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop),
|
|
|
- lastMigrationInfo.getFeatureName()
|
|
|
- );
|
|
|
+ startFeatureMigration(lastMigrationInfo.getFeatureName());
|
|
|
}, this::markAsFailed)
|
|
|
);
|
|
|
updateTask.submit(clusterService);
|
|
|
}
|
|
|
|
|
|
- private void prepareNextIndex(Consumer<ClusterState> listener, String lastFeatureName) {
|
|
|
+ private void startFeatureMigration(String lastFeatureName) {
|
|
|
synchronized (migrationQueue) {
|
|
|
assert migrationQueue != null;
|
|
|
if (migrationQueue.isEmpty()) {
|
|
@@ -274,29 +317,23 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- final SystemIndexMigrationInfo migrationInfo = currentMigrationInfo();
|
|
|
+ final SystemResourceMigrationInfo migrationInfo = currentMigrationInfo();
|
|
|
assert migrationInfo != null : "the queue of indices to migrate should have been checked for emptiness before calling this method";
|
|
|
- logger.info(
|
|
|
- "preparing to migrate old index [{}] from feature [{}] to new index [{}]",
|
|
|
- migrationInfo.getCurrentIndexName(),
|
|
|
- migrationInfo.getFeatureName(),
|
|
|
- migrationInfo.getNextIndexName()
|
|
|
- );
|
|
|
if (migrationInfo.getFeatureName().equals(lastFeatureName) == false) {
|
|
|
// And then invoke the pre-migration hook for the next one.
|
|
|
migrationInfo.prepareForIndicesMigration(clusterService, baseClient, ActionListener.wrap(newMetadata -> {
|
|
|
currentFeatureCallbackMetadata.set(newMetadata);
|
|
|
- updateTaskState(migrationInfo, listener, newMetadata);
|
|
|
+ updateTaskState(migrationInfo, state -> migrateResource(migrationInfo, state), newMetadata);
|
|
|
}, this::markAsFailed));
|
|
|
} else {
|
|
|
// Otherwise, just re-use what we already have.
|
|
|
- updateTaskState(migrationInfo, listener, currentFeatureCallbackMetadata.get());
|
|
|
+ updateTaskState(migrationInfo, state -> migrateResource(migrationInfo, state), currentFeatureCallbackMetadata.get());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void updateTaskState(SystemIndexMigrationInfo migrationInfo, Consumer<ClusterState> listener, Map<String, Object> metadata) {
|
|
|
+ private void updateTaskState(SystemResourceMigrationInfo migrationInfo, Consumer<ClusterState> listener, Map<String, Object> metadata) {
|
|
|
final SystemIndexMigrationTaskState newTaskState = new SystemIndexMigrationTaskState(
|
|
|
- migrationInfo.getCurrentIndexName(),
|
|
|
+ migrationInfo.getCurrentResourceName(),
|
|
|
migrationInfo.getFeatureName(),
|
|
|
metadata
|
|
|
);
|
|
@@ -309,16 +346,21 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}, this::markAsFailed));
|
|
|
}
|
|
|
|
|
|
- private static boolean needsToBeMigrated(IndexMetadata indexMetadata) {
|
|
|
- assert indexMetadata != null : "null IndexMetadata should be impossible, we're not consistently using the same cluster state";
|
|
|
- if (indexMetadata == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- return indexMetadata.isSystem() && indexMetadata.getCreationVersion().before(NO_UPGRADE_REQUIRED_INDEX_VERSION);
|
|
|
+ private static boolean needToBeMigrated(Stream<IndexMetadata> indicesMetadata) {
|
|
|
+ return indicesMetadata.anyMatch(indexMetadata -> {
|
|
|
+ assert indexMetadata != null : "null IndexMetadata should be impossible, we're not consistently using the same cluster state";
|
|
|
+ if (indexMetadata == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return indexMetadata.isSystem() && indexMetadata.getCreationVersion().before(NO_UPGRADE_REQUIRED_INDEX_VERSION);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScrollResponse> listener) {
|
|
|
- final SystemIndexMigrationInfo migrationInfo = currentMigrationInfo();
|
|
|
+ private void migrateSingleIndex(
|
|
|
+ SystemIndexMigrationInfo migrationInfo,
|
|
|
+ ClusterState clusterState,
|
|
|
+ BiConsumer<SystemIndexMigrationInfo, BulkByScrollResponse> listener
|
|
|
+ ) {
|
|
|
String oldIndexName = migrationInfo.getCurrentIndexName();
|
|
|
final ProjectMetadata projectMetadata = clusterState.metadata().getProject();
|
|
|
final IndexMetadata imd = projectMetadata.index(oldIndexName);
|
|
@@ -375,7 +417,10 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}
|
|
|
|
|
|
logger.info("migrating index [{}] from feature [{}] to new index [{}]", oldIndexName, migrationInfo.getFeatureName(), newIndexName);
|
|
|
- ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
|
|
|
+ ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(
|
|
|
+ response -> listener.accept(migrationInfo, response),
|
|
|
+ this::markAsFailed
|
|
|
+ );
|
|
|
try {
|
|
|
createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
|
|
|
logger.debug(
|
|
@@ -479,7 +524,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
.mapping(migrationInfo.getMappings())
|
|
|
.settings(Objects.requireNonNullElse(settingsBuilder.build(), Settings.EMPTY));
|
|
|
|
|
|
- baseClient.admin().indices().create(createIndexRequest, listener);
|
|
|
+ migrationInfo.createClient(baseClient).admin().indices().create(createIndexRequest, listener);
|
|
|
}
|
|
|
|
|
|
private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener<CreateIndexResponse> listener) {
|
|
@@ -507,7 +552,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
private <T> void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) {
|
|
|
logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
|
|
|
String newIndexName = migrationInfo.getNextIndexName();
|
|
|
- baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> {
|
|
|
+ migrationInfo.createClient(baseClient).admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> {
|
|
|
if (ackedResponse.isAcknowledged()) {
|
|
|
logger.info("successfully removed index [{}]", newIndexName);
|
|
|
listener.onResponse(ackedResponse);
|
|
@@ -583,12 +628,191 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
migrationInfo.createClient(baseClient).execute(ReindexAction.INSTANCE, reindexRequest, listener);
|
|
|
}
|
|
|
|
|
|
+ private void migrateDataStream(
|
|
|
+ SystemDataStreamMigrationInfo migrationInfo,
|
|
|
+ Consumer<SystemDataStreamMigrationInfo> completionListener
|
|
|
+ ) {
|
|
|
+ String dataStreamName = migrationInfo.getDataStreamName();
|
|
|
+ logger.info("migrating data stream [{}] from feature [{}]", dataStreamName, migrationInfo.getFeatureName());
|
|
|
+
|
|
|
+ ReindexDataStreamAction.ReindexDataStreamRequest reindexRequest = new ReindexDataStreamAction.ReindexDataStreamRequest(
|
|
|
+ ReindexDataStreamAction.Mode.UPGRADE,
|
|
|
+ dataStreamName
|
|
|
+ );
|
|
|
+
|
|
|
+ try {
|
|
|
+ migrationInfo.createClient(baseClient)
|
|
|
+ .execute(ReindexDataStreamAction.INSTANCE, reindexRequest, ActionListener.wrap(startMigrationResponse -> {
|
|
|
+ if (startMigrationResponse.isAcknowledged() == false) {
|
|
|
+ logger.error("failed to migrate indices from data stream [{}]", dataStreamName);
|
|
|
+ throw new ElasticsearchException(
|
|
|
+ "reindex system data stream ["
|
|
|
+ + dataStreamName
|
|
|
+ + "] from feature ["
|
|
|
+ + migrationInfo.getFeatureName()
|
|
|
+ + "] response is not acknowledge"
|
|
|
+ );
|
|
|
+ }
|
|
|
+ checkDataStreamMigrationStatus(migrationInfo, completionListener, false);
|
|
|
+ }, e -> {
|
|
|
+ if (e instanceof ResourceAlreadyExistsException) {
|
|
|
+ // This might happen if the task has been reassigned to another node,
|
|
|
+ // in this case we can just wait for the data stream migration task to finish.
|
|
|
+ // But, there is a possibility that previously started data stream migration task has failed,
|
|
|
+ // in this case we need to cancel it and restart migration of the data stream.
|
|
|
+ logger.debug("data stream [{}] migration is already in progress", dataStreamName);
|
|
|
+ checkDataStreamMigrationStatus(migrationInfo, completionListener, true);
|
|
|
+ } else {
|
|
|
+ markAsFailed(e);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ } catch (Exception ex) {
|
|
|
+ logger.error(
|
|
|
+ () -> format(
|
|
|
+ "error occurred while migrating data stream [%s] from feature [%s]",
|
|
|
+ dataStreamName,
|
|
|
+ migrationInfo.getFeatureName()
|
|
|
+ ),
|
|
|
+ ex
|
|
|
+ );
|
|
|
+ markAsFailed(ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkDataStreamMigrationStatus(
|
|
|
+ SystemDataStreamMigrationInfo migrationInfo,
|
|
|
+ Consumer<SystemDataStreamMigrationInfo> completionListener,
|
|
|
+ boolean restartMigrationOnError
|
|
|
+ ) {
|
|
|
+ String dataStreamName = migrationInfo.getDataStreamName();
|
|
|
+ GetMigrationReindexStatusAction.Request getStatusRequest = new GetMigrationReindexStatusAction.Request(dataStreamName);
|
|
|
+
|
|
|
+ migrationInfo.createClient(baseClient)
|
|
|
+ .execute(GetMigrationReindexStatusAction.INSTANCE, getStatusRequest, ActionListener.wrap(migrationStatusResponse -> {
|
|
|
+ ReindexDataStreamEnrichedStatus status = migrationStatusResponse.getEnrichedStatus();
|
|
|
+ logger.debug(
|
|
|
+ "data stream [{}] reindexing status: pending {} out of {} indices",
|
|
|
+ dataStreamName,
|
|
|
+ status.pending(),
|
|
|
+ status.totalIndicesToBeUpgraded()
|
|
|
+ );
|
|
|
+
|
|
|
+ if (status.complete() == false) {
|
|
|
+ // data stream migration task is running, schedule another check without need to cancel-restart
|
|
|
+ threadPool.schedule(
|
|
|
+ () -> checkDataStreamMigrationStatus(migrationInfo, completionListener, false),
|
|
|
+ TimeValue.timeValueSeconds(1),
|
|
|
+ threadPool.generic()
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ List<Tuple<String, Exception>> errors = status.errors();
|
|
|
+ if (errors != null && errors.isEmpty() == false || status.exception() != null) {
|
|
|
+
|
|
|
+ // data stream migration task existed before this task started it and is in failed state - cancel it and restart
|
|
|
+ if (restartMigrationOnError) {
|
|
|
+ cancelExistingDataStreamMigrationAndRetry(migrationInfo, completionListener);
|
|
|
+ } else {
|
|
|
+ List<Exception> exceptions = (status.exception() != null)
|
|
|
+ ? Collections.singletonList(status.exception())
|
|
|
+ : errors.stream().map(Tuple::v2).toList();
|
|
|
+ dataStreamMigrationFailed(migrationInfo, exceptions);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ logger.info(
|
|
|
+ "successfully migrated old indices from data stream [{}] from feature [{}] to new indices",
|
|
|
+ dataStreamName,
|
|
|
+ migrationInfo.getFeatureName()
|
|
|
+ );
|
|
|
+ completionListener.accept(migrationInfo);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, ex -> cancelExistingDataStreamMigrationAndMarkAsFailed(migrationInfo, ex)));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void dataStreamMigrationFailed(SystemDataStreamMigrationInfo migrationInfo, Collection<Exception> exceptions) {
|
|
|
+ logger.error(
|
|
|
+ "error occurred while reindexing data stream [{}] from feature [{}], failures [{}]",
|
|
|
+ migrationInfo.getDataStreamName(),
|
|
|
+ migrationInfo.getFeatureName(),
|
|
|
+ exceptions
|
|
|
+ );
|
|
|
+
|
|
|
+ ElasticsearchException ex = new ElasticsearchException(
|
|
|
+ "error occurred while reindexing data stream [" + migrationInfo.getDataStreamName() + "]"
|
|
|
+ );
|
|
|
+ for (Exception exception : exceptions) {
|
|
|
+ ex.addSuppressed(exception);
|
|
|
+ }
|
|
|
+
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+
|
|
|
// Failure handlers
|
|
|
private void removeReadOnlyBlockOnReindexFailure(Index index, ActionListener<BulkByScrollResponse> listener, Exception ex) {
|
|
|
logger.info("removing read only block on [{}] because reindex failed [{}]", index, ex);
|
|
|
setWriteBlock(index, false, ActionListener.wrap(unsetReadOnlyResponse -> listener.onFailure(ex), e1 -> listener.onFailure(ex)));
|
|
|
}
|
|
|
|
|
|
+ private void cancelExistingDataStreamMigrationAndRetry(
|
|
|
+ SystemDataStreamMigrationInfo migrationInfo,
|
|
|
+ Consumer<SystemDataStreamMigrationInfo> completionListener
|
|
|
+ ) {
|
|
|
+ logger.debug(
|
|
|
+ "cancelling migration of data stream [{}] from feature [{}] for retry",
|
|
|
+ migrationInfo.getDataStreamName(),
|
|
|
+ migrationInfo.getFeatureName()
|
|
|
+ );
|
|
|
+
|
|
|
+ ActionListener<AcknowledgedResponse> listener = ActionListener.wrap(response -> {
|
|
|
+ if (response.isAcknowledged()) {
|
|
|
+ migrateDataStream(migrationInfo, completionListener);
|
|
|
+ } else {
|
|
|
+ String dataStreamName = migrationInfo.getDataStreamName();
|
|
|
+ logger.error(
|
|
|
+ "failed to cancel migration of data stream [{}] from feature [{}] during retry",
|
|
|
+ dataStreamName,
|
|
|
+ migrationInfo.getFeatureName()
|
|
|
+ );
|
|
|
+ throw new ElasticsearchException(
|
|
|
+ "failed to cancel migration of data stream ["
|
|
|
+ + dataStreamName
|
|
|
+ + "] from feature ["
|
|
|
+ + migrationInfo.getFeatureName()
|
|
|
+ + "] response is not acknowledge"
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }, this::markAsFailed);
|
|
|
+
|
|
|
+ cancelDataStreamMigration(migrationInfo, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void cancelExistingDataStreamMigrationAndMarkAsFailed(SystemDataStreamMigrationInfo migrationInfo, Exception exception) {
|
|
|
+ logger.info(
|
|
|
+ "cancelling migration of data stream [{}] from feature [{}]",
|
|
|
+ migrationInfo.getDataStreamName(),
|
|
|
+ migrationInfo.getFeatureName()
|
|
|
+ );
|
|
|
+
|
|
|
+ // we don't really care here if the request wasn't acknowledged
|
|
|
+ ActionListener<AcknowledgedResponse> listener = ActionListener.wrap(response -> markAsFailed(exception), ex -> {
|
|
|
+ exception.addSuppressed(ex);
|
|
|
+ markAsFailed(exception);
|
|
|
+ });
|
|
|
+
|
|
|
+ cancelDataStreamMigration(migrationInfo, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void cancelDataStreamMigration(SystemDataStreamMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) {
|
|
|
+ String dataStreamName = migrationInfo.getDataStreamName();
|
|
|
+
|
|
|
+ CancelReindexDataStreamAction.Request cancelRequest = new CancelReindexDataStreamAction.Request(dataStreamName);
|
|
|
+ try {
|
|
|
+ migrationInfo.createClient(baseClient).execute(CancelReindexDataStreamAction.INSTANCE, cancelRequest, listener);
|
|
|
+ } catch (Exception e) {
|
|
|
+ listener.onFailure(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static ElasticsearchException logAndThrowExceptionForFailures(BulkByScrollResponse bulkByScrollResponse) {
|
|
|
String bulkFailures = (bulkByScrollResponse.getBulkFailures() != null)
|
|
|
? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getBulkFailures())
|
|
@@ -611,12 +835,16 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
*/
|
|
|
@Override
|
|
|
public void markAsFailed(Exception e) {
|
|
|
- SystemIndexMigrationInfo migrationInfo = currentMigrationInfo();
|
|
|
+ SystemResourceMigrationInfo migrationInfo = currentMigrationInfo();
|
|
|
synchronized (migrationQueue) {
|
|
|
migrationQueue.clear();
|
|
|
}
|
|
|
- String featureName = Optional.ofNullable(migrationInfo).map(SystemIndexMigrationInfo::getFeatureName).orElse("<unknown feature>");
|
|
|
- String indexName = Optional.ofNullable(migrationInfo).map(SystemIndexMigrationInfo::getCurrentIndexName).orElse("<unknown index>");
|
|
|
+ String featureName = Optional.ofNullable(migrationInfo)
|
|
|
+ .map(SystemResourceMigrationInfo::getFeatureName)
|
|
|
+ .orElse("<unknown feature>");
|
|
|
+ String indexName = Optional.ofNullable(migrationInfo)
|
|
|
+ .map(SystemResourceMigrationInfo::getCurrentResourceName)
|
|
|
+ .orElse("<unknown resource>");
|
|
|
|
|
|
MigrationResultsUpdateTask.upsert(
|
|
|
featureName,
|
|
@@ -666,7 +894,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
clusterService.submitUnbatchedStateUpdateTask(source, task);
|
|
|
}
|
|
|
|
|
|
- private SystemIndexMigrationInfo currentMigrationInfo() {
|
|
|
+ private SystemResourceMigrationInfo currentMigrationInfo() {
|
|
|
synchronized (migrationQueue) {
|
|
|
return migrationQueue.peek();
|
|
|
}
|