|
@@ -207,49 +207,14 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}
|
|
|
|
|
|
// Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
|
|
|
- cleanUpPreviousMigration(
|
|
|
- taskState,
|
|
|
- clusterState,
|
|
|
- state -> prepareNextIndex(state, state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName)
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
- private void cleanUpPreviousMigration(
|
|
|
- SystemIndexMigrationTaskState taskState,
|
|
|
- ClusterState currentState,
|
|
|
- Consumer<ClusterState> listener
|
|
|
- ) {
|
|
|
logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState));
|
|
|
- if (taskState != null && taskState.getCurrentIndex() != null) {
|
|
|
- SystemIndexMigrationInfo migrationInfo;
|
|
|
- try {
|
|
|
- migrationInfo = SystemIndexMigrationInfo.fromTaskState(
|
|
|
- taskState,
|
|
|
- systemIndices,
|
|
|
- currentState.metadata(),
|
|
|
- indexScopedSettings
|
|
|
- );
|
|
|
- } catch (Exception e) {
|
|
|
- markAsFailed(e);
|
|
|
- return;
|
|
|
- }
|
|
|
- final String newIndexName = migrationInfo.getNextIndexName();
|
|
|
- logger.info("removing index [{}] from previous incomplete migration", newIndexName);
|
|
|
-
|
|
|
- migrationInfo.createClient(baseClient)
|
|
|
- .admin()
|
|
|
- .indices()
|
|
|
- .prepareDelete(newIndexName)
|
|
|
- .execute(ActionListener.wrap(ackedResponse -> {
|
|
|
- if (ackedResponse.isAcknowledged()) {
|
|
|
- logger.debug("successfully removed index [{}]", newIndexName);
|
|
|
- clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
|
|
|
- }
|
|
|
- }, this::markAsFailed));
|
|
|
- } else {
|
|
|
- logger.debug("no incomplete index to remove");
|
|
|
- clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
|
|
|
- }
|
|
|
+ clearResults(
|
|
|
+ clusterService,
|
|
|
+ ActionListener.wrap(
|
|
|
+ state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName),
|
|
|
+ this::markAsFailed
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
|
|
@@ -289,11 +254,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}, this::markAsFailed)
|
|
|
);
|
|
|
} else {
|
|
|
- prepareNextIndex(
|
|
|
- clusterService.state(),
|
|
|
- state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop),
|
|
|
- lastMigrationInfo.getFeatureName()
|
|
|
- );
|
|
|
+ prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -303,7 +264,6 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
SingleFeatureMigrationResult.success(),
|
|
|
ActionListener.wrap(state -> {
|
|
|
prepareNextIndex(
|
|
|
- state,
|
|
|
clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop),
|
|
|
lastMigrationInfo.getFeatureName()
|
|
|
);
|
|
@@ -312,7 +272,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
updateTask.submit(clusterService);
|
|
|
}
|
|
|
|
|
|
- private void prepareNextIndex(ClusterState clusterState, Consumer<ClusterState> listener, String lastFeatureName) {
|
|
|
+ private void prepareNextIndex(Consumer<ClusterState> listener, String lastFeatureName) {
|
|
|
synchronized (migrationQueue) {
|
|
|
assert migrationQueue != null;
|
|
|
if (migrationQueue.isEmpty()) {
|
|
@@ -424,7 +384,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
logger.info("migrating index [{}] from feature [{}] to new index [{}]", oldIndexName, migrationInfo.getFeatureName(), newIndexName);
|
|
|
ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
|
|
|
try {
|
|
|
- createIndex(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
|
|
|
+ createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
|
|
|
logger.debug(
|
|
|
"while migrating [{}] , got create index response: [{}]",
|
|
|
oldIndexName,
|
|
@@ -509,6 +469,8 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
}
|
|
|
|
|
|
private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
|
|
|
+ logger.info("creating new system index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
|
|
|
+
|
|
|
final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest(
|
|
|
"migrate-system-index",
|
|
|
migrationInfo.getNextIndexName(),
|
|
@@ -534,6 +496,35 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
|
|
|
+ createIndex(migrationInfo, listener.delegateResponse((l, e) -> {
|
|
|
+ logger.warn("createIndex failed, retrying after removing index [{}] from previous attempt", migrationInfo.getNextIndexName());
|
|
|
+ deleteIndex(migrationInfo, ActionListener.wrap(cleanupResponse -> createIndex(migrationInfo, l.delegateResponse((l3, e3) -> {
|
|
|
+ logger.error(
|
|
|
+ "createIndex failed after retrying, aborting system index migration. index: " + migrationInfo.getNextIndexName(),
|
|
|
+ e3
|
|
|
+ );
|
|
|
+ l.onFailure(e3);
|
|
|
+ })), e2 -> {
|
|
|
+ logger.error("deleteIndex failed, aborting system index migration. index: " + migrationInfo.getNextIndexName(), e2);
|
|
|
+ l.onFailure(e2);
|
|
|
+ }));
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ private <T> void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) {
|
|
|
+ logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
|
|
|
+ String newIndexName = migrationInfo.getNextIndexName();
|
|
|
+ baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> {
|
|
|
+ if (ackedResponse.isAcknowledged()) {
|
|
|
+ logger.info("successfully removed index [{}]", newIndexName);
|
|
|
+ listener.onResponse(ackedResponse);
|
|
|
+ } else {
|
|
|
+ listener.onFailure(new ElasticsearchException("Failed to acknowledge index deletion for [" + newIndexName + "]"));
|
|
|
+ }
|
|
|
+ }, listener::onFailure));
|
|
|
+ }
|
|
|
+
|
|
|
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
|
|
|
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
|
|
|
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
|