|
@@ -266,7 +266,7 @@ public class JobResultsPersister {
|
|
|
public void persistCategoryDefinition(CategoryDefinition category, Supplier<Boolean> shouldRetry) {
|
|
|
Persistable persistable =
|
|
|
new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId()), category.getJobId(), category, category.getId());
|
|
|
- persistable.persist(shouldRetry);
|
|
|
+ persistable.persist(shouldRetry, true);
|
|
|
// Don't commit as we expect masses of these updates and they're not
|
|
|
// read again by this process
|
|
|
}
|
|
@@ -290,7 +290,7 @@ public class JobResultsPersister {
|
|
|
: AnomalyDetectorsIndex.jobStateIndexWriteAlias();
|
|
|
|
|
|
Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId);
|
|
|
- persistable.persist(shouldRetry);
|
|
|
+ persistable.persist(shouldRetry, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -311,7 +311,7 @@ public class JobResultsPersister {
|
|
|
|
|
|
Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId);
|
|
|
persistable.setRefreshPolicy(refreshPolicy);
|
|
|
- persistable.persist(listener);
|
|
|
+ persistable.persist(listener, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias));
|
|
|
},
|
|
|
listener::onFailure
|
|
|
);
|
|
@@ -345,7 +345,7 @@ public class JobResultsPersister {
|
|
|
modelSnapshot,
|
|
|
ModelSnapshot.documentId(modelSnapshot));
|
|
|
persistable.setRefreshPolicy(refreshPolicy);
|
|
|
- return persistable.persist(shouldRetry);
|
|
|
+ return persistable.persist(shouldRetry, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -356,7 +356,7 @@ public class JobResultsPersister {
|
|
|
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
|
|
|
Persistable persistable =
|
|
|
new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId());
|
|
|
- persistable.persist(shouldRetry);
|
|
|
+ persistable.persist(shouldRetry, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -369,7 +369,7 @@ public class JobResultsPersister {
|
|
|
Persistable persistable =
|
|
|
new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId());
|
|
|
persistable.setRefreshPolicy(refreshPolicy);
|
|
|
- persistable.persist(listener);
|
|
|
+ persistable.persist(listener, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -448,7 +448,7 @@ public class JobResultsPersister {
|
|
|
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
|
|
DatafeedTimingStats.documentId(timingStats.getJobId()));
|
|
|
persistable.setRefreshPolicy(refreshPolicy);
|
|
|
- return persistable.persist(() -> true);
|
|
|
+ return persistable.persist(() -> true, true);
|
|
|
}
|
|
|
|
|
|
private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException {
|
|
@@ -483,7 +483,7 @@ public class JobResultsPersister {
|
|
|
this.refreshPolicy = refreshPolicy;
|
|
|
}
|
|
|
|
|
|
- BulkResponse persist(Supplier<Boolean> shouldRetry) {
|
|
|
+ BulkResponse persist(Supplier<Boolean> shouldRetry, boolean requireAlias) {
|
|
|
logCall(indexName);
|
|
|
try {
|
|
|
return resultsPersisterService.indexWithRetry(jobId,
|
|
@@ -492,6 +492,7 @@ public class JobResultsPersister {
|
|
|
params,
|
|
|
refreshPolicy,
|
|
|
id,
|
|
|
+ requireAlias,
|
|
|
shouldRetry,
|
|
|
(msg) -> auditor.warning(jobId, id + " " + msg));
|
|
|
} catch (IOException e) {
|
|
@@ -504,11 +505,15 @@ public class JobResultsPersister {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void persist(ActionListener<IndexResponse> listener) {
|
|
|
+ void persist(ActionListener<IndexResponse> listener, boolean requireAlias) {
|
|
|
logCall(indexName);
|
|
|
|
|
|
try (XContentBuilder content = toXContentBuilder(object, params)) {
|
|
|
- IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(content).setRefreshPolicy(refreshPolicy);
|
|
|
+ IndexRequest indexRequest = new IndexRequest(indexName)
|
|
|
+ .id(id)
|
|
|
+ .source(content)
|
|
|
+ .setRefreshPolicy(refreshPolicy)
|
|
|
+ .setRequireAlias(requireAlias);
|
|
|
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest, listener, client::index);
|
|
|
} catch (IOException e) {
|
|
|
logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e);
|