|
|
@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.transform.transforms;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
+import org.apache.lucene.util.SetOnce;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.ResourceNotFoundException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
@@ -31,6 +32,7 @@ import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
|
|
|
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
|
|
import org.elasticsearch.xpack.core.indexing.IterationResult;
|
|
|
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
|
|
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
|
|
@@ -58,6 +60,7 @@ import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
+import static java.util.Collections.emptyMap;
|
|
|
import static org.elasticsearch.core.Strings.format;
|
|
|
|
|
|
public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformIndexerPosition, TransformIndexerStats> {
|
|
|
@@ -174,7 +177,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
|
|
|
abstract void persistState(TransformState state, ActionListener<Void> listener);
|
|
|
|
|
|
- abstract void validate(ActionListener<Void> listener);
|
|
|
+ abstract void validate(ActionListener<ValidateTransformAction.Response> listener);
|
|
|
|
|
|
@Override
|
|
|
protected String getJobId() {
|
|
|
@@ -265,6 +268,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ SetOnce<Map<String, String>> deducedDestIndexMappings = new SetOnce<>();
|
|
|
+
|
|
|
ActionListener<Void> finalListener = ActionListener.wrap(r -> {
|
|
|
try {
|
|
|
// if we haven't set the page size yet, if it is set we might have reduced it after running into an out of memory
|
|
|
@@ -326,8 +331,14 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
}
|
|
|
}, listener::onFailure);
|
|
|
|
|
|
- ActionListener<Map<String, String>> fieldMappingsListener = ActionListener.wrap(mappings -> {
|
|
|
- this.fieldMappings = mappings;
|
|
|
+ ActionListener<Map<String, String>> fieldMappingsListener = ActionListener.wrap(destIndexMappings -> {
|
|
|
+ if (destIndexMappings.isEmpty() == false) {
|
|
|
+ // If we managed to fetch destination index mappings, we use them from now on ...
|
|
|
+ this.fieldMappings = destIndexMappings;
|
|
|
+ } else {
|
|
|
+ // ... otherwise we fall back to index mappings deduced based on source indices
|
|
|
+ this.fieldMappings = deducedDestIndexMappings.get();
|
|
|
+ }
|
|
|
configurationReadyListener.onResponse(null);
|
|
|
}, listener::onFailure);
|
|
|
|
|
|
@@ -338,7 +349,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
}, listener::onFailure);
|
|
|
|
|
|
// If we are continuous, we will want to verify we have the latest stored configuration
|
|
|
- ActionListener<Void> changedSourceListener = ActionListener.wrap(r -> {
|
|
|
+ ActionListener<ValidateTransformAction.Response> changedSourceListener = ActionListener.wrap(validationResponse -> {
|
|
|
+ deducedDestIndexMappings.set(validationResponse.getDestIndexMappings());
|
|
|
if (isContinuous()) {
|
|
|
transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> {
|
|
|
if (transformConfig.equals(config) && fieldMappings != null) {
|
|
|
@@ -377,7 +389,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
if (hasChanged) {
|
|
|
context.setChangesLastDetectedAt(instantOfTrigger);
|
|
|
logger.debug("[{}] source has changed, triggering new indexer run.", getJobId());
|
|
|
- changedSourceListener.onResponse(null);
|
|
|
+ changedSourceListener.onResponse(new ValidateTransformAction.Response(emptyMap()));
|
|
|
} else {
|
|
|
logger.trace("[{}] source has not changed, finish indexer early.", getJobId());
|
|
|
// No changes, stop executing
|
|
|
@@ -396,7 +408,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
hasSourceChanged = true;
|
|
|
context.setLastSearchTime(instantOfTrigger);
|
|
|
context.setChangesLastDetectedAt(instantOfTrigger);
|
|
|
- changedSourceListener.onResponse(null);
|
|
|
+ changedSourceListener.onResponse(new ValidateTransformAction.Response(emptyMap()));
|
|
|
}
|
|
|
}
|
|
|
|