|
@@ -10,10 +10,10 @@ import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
|
|
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
|
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
|
|
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
|
|
|
+import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
import org.elasticsearch.action.support.HandledTransportAction;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
@@ -22,9 +22,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
import org.elasticsearch.client.internal.Client;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
-import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
-import org.elasticsearch.common.settings.IndexScopedSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
@@ -37,28 +35,25 @@ import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
+
|
|
|
+import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
|
|
|
+import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
|
|
|
|
|
|
public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
|
|
|
ReindexDataStreamIndexAction.Request,
|
|
|
ReindexDataStreamIndexAction.Response> {
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class);
|
|
|
-
|
|
|
- private static final Set<String> SETTINGS_TO_ADD_BACK = Set.of(IndexMetadata.SETTING_BLOCKS_WRITE, IndexMetadata.SETTING_READ_ONLY);
|
|
|
-
|
|
|
private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false);
|
|
|
private final ClusterService clusterService;
|
|
|
private final Client client;
|
|
|
- private final IndexScopedSettings indexScopedSettings;
|
|
|
|
|
|
@Inject
|
|
|
public ReindexDataStreamIndexTransportAction(
|
|
|
TransportService transportService,
|
|
|
ClusterService clusterService,
|
|
|
ActionFilters actionFilters,
|
|
|
- Client client,
|
|
|
- IndexScopedSettings indexScopedSettings
|
|
|
+ Client client
|
|
|
) {
|
|
|
super(
|
|
|
ReindexDataStreamIndexAction.NAME,
|
|
@@ -70,7 +65,6 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
|
|
|
);
|
|
|
this.clusterService = clusterService;
|
|
|
this.client = client;
|
|
|
- this.indexScopedSettings = indexScopedSettings;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -96,20 +90,19 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
|
|
|
|
|
|
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l))
|
|
|
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l))
|
|
|
- .<CreateIndexResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l))
|
|
|
+ .<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l))
|
|
|
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l))
|
|
|
- .<AcknowledgedResponse>andThen(l -> updateSettings(settingsBefore, destIndexName, l))
|
|
|
+ .<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l))
|
|
|
+ .<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l))
|
|
|
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
|
|
|
.addListener(listener);
|
|
|
}
|
|
|
|
|
|
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener) {
|
|
|
logger.debug("Setting write block on source index [{}]", sourceIndexName);
|
|
|
- final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build();
|
|
|
- var updateSettingsRequest = new UpdateSettingsRequest(readOnlySettings, sourceIndexName);
|
|
|
- client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
|
|
|
+ addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
|
|
|
@Override
|
|
|
- public void onResponse(AcknowledgedResponse response) {
|
|
|
+ public void onResponse(AddIndexBlockResponse response) {
|
|
|
if (response.isAcknowledged()) {
|
|
|
listener.onResponse(null);
|
|
|
} else {
|
|
@@ -121,7 +114,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
|
|
|
@Override
|
|
|
public void onFailure(Exception e) {
|
|
|
if (e instanceof ClusterBlockException || e.getCause() instanceof ClusterBlockException) {
|
|
|
- // It's fine if read-only is already set
|
|
|
+ // It's fine if block-writes is already set
|
|
|
listener.onResponse(null);
|
|
|
} else {
|
|
|
listener.onFailure(e);
|
|
@@ -138,18 +131,23 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
|
|
|
client.admin().indices().delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage));
|
|
|
}
|
|
|
|
|
|
- private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener<CreateIndexResponse> listener) {
|
|
|
+ private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener<AcknowledgedResponse> listener) {
|
|
|
logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName());
|
|
|
|
|
|
- // Create destination with subset of source index settings that can be added before reindex
|
|
|
- var settings = getPreSettings(sourceIndex);
|
|
|
-
|
|
|
- var sourceMapping = sourceIndex.mapping();
|
|
|
- Map<String, Object> mapping = sourceMapping != null ? sourceMapping.rawSourceAsMap() : Map.of();
|
|
|
- var createIndexRequest = new CreateIndexRequest(destIndexName).settings(settings).mapping(mapping);
|
|
|
-
|
|
|
- var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", destIndexName);
|
|
|
- client.admin().indices().create(createIndexRequest, failIfNotAcknowledged(listener, errorMessage));
|
|
|
+ // override read-only settings if they exist
|
|
|
+ var removeReadOnlyOverride = Settings.builder()
|
|
|
+ .putNull(IndexMetadata.SETTING_READ_ONLY)
|
|
|
+ .putNull(IndexMetadata.SETTING_BLOCKS_WRITE)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ var request = new CreateIndexFromSourceAction.Request(
|
|
|
+ sourceIndex.getIndex().getName(),
|
|
|
+ destIndexName,
|
|
|
+ removeReadOnlyOverride,
|
|
|
+ Map.of()
|
|
|
+ );
|
|
|
+ var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", request.getDestIndex());
|
|
|
+ client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage));
|
|
|
}
|
|
|
|
|
|
private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener) {
|
|
@@ -162,35 +160,18 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
|
|
|
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
|
|
|
}
|
|
|
|
|
|
- private void updateSettings(Settings settingsBefore, String destIndexName, ActionListener<AcknowledgedResponse> listener) {
|
|
|
- logger.debug("Adding settings from source index that could not be added before reindex");
|
|
|
-
|
|
|
- Settings postSettings = getPostSettings(settingsBefore);
|
|
|
- if (postSettings.isEmpty()) {
|
|
|
+ private void addBlockIfFromSource(
|
|
|
+ IndexMetadata.APIBlock block,
|
|
|
+ Settings settingsBefore,
|
|
|
+ String destIndexName,
|
|
|
+ ActionListener<AddIndexBlockResponse> listener
|
|
|
+ ) {
|
|
|
+ if (settingsBefore.getAsBoolean(block.settingName(), false)) {
|
|
|
+ var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName);
|
|
|
+ addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage));
|
|
|
+ } else {
|
|
|
listener.onResponse(null);
|
|
|
- return;
|
|
|
}
|
|
|
-
|
|
|
- var updateSettingsRequest = new UpdateSettingsRequest(postSettings, destIndexName);
|
|
|
- var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName);
|
|
|
- client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage));
|
|
|
- }
|
|
|
-
|
|
|
- // Filter source index settings to subset of settings that can be included during reindex.
|
|
|
- // Similar to the settings filtering done when reindexing for upgrade in Kibana
|
|
|
- // https://github.com/elastic/kibana/blob/8a8363f02cc990732eb9cbb60cd388643a336bed/x-pack
|
|
|
- // /plugins/upgrade_assistant/server/lib/reindexing/index_settings.ts#L155
|
|
|
- private Settings getPreSettings(IndexMetadata sourceIndex) {
|
|
|
- // filter settings that will be added back later
|
|
|
- var filtered = sourceIndex.getSettings().filter(settingName -> SETTINGS_TO_ADD_BACK.contains(settingName) == false);
|
|
|
-
|
|
|
- // filter private and non-copyable settings
|
|
|
- var builder = MetadataCreateIndexService.copySettingsFromSource(false, filtered, indexScopedSettings, Settings.builder());
|
|
|
- return builder.build();
|
|
|
- }
|
|
|
-
|
|
|
- private Settings getPostSettings(Settings settingsBefore) {
|
|
|
- return settingsBefore.filter(SETTINGS_TO_ADD_BACK::contains);
|
|
|
}
|
|
|
|
|
|
public static String generateDestIndexName(String sourceIndex) {
|
|
@@ -201,11 +182,16 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
|
|
|
ActionListener<U> listener,
|
|
|
String errorMessage
|
|
|
) {
|
|
|
- return listener.delegateFailureAndWrap((delegate, response) -> {
|
|
|
+ return listener.delegateFailure((delegate, response) -> {
|
|
|
if (response.isAcknowledged()) {
|
|
|
delegate.onResponse(null);
|
|
|
+ } else {
|
|
|
+ delegate.onFailure(new ElasticsearchException(errorMessage));
|
|
|
}
|
|
|
- throw new ElasticsearchException(errorMessage);
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ private void addBlockToIndex(IndexMetadata.APIBlock block, String index, ActionListener<AddIndexBlockResponse> listener) {
|
|
|
+ client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, new AddIndexBlockRequest(block, index), listener);
|
|
|
+ }
|
|
|
}
|