|
@@ -14,24 +14,21 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
|
|
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
|
|
import org.elasticsearch.action.support.ActiveShardCount;
|
|
|
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.metadata.AliasMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
|
|
|
+import org.elasticsearch.cluster.metadata.Template;
|
|
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
-import org.elasticsearch.common.bytes.BytesReference;
|
|
|
+import org.elasticsearch.common.compress.CompressedXContent;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
-import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
-import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
-import org.elasticsearch.xcontent.XContentType;
|
|
|
import org.elasticsearch.index.mapper.MapperService;
|
|
|
import org.elasticsearch.indices.SystemIndexDescriptor;
|
|
|
+import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
|
|
|
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
|
|
import org.elasticsearch.xpack.core.transform.TransformField;
|
|
@@ -48,8 +45,8 @@ import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformIn
|
|
|
import java.io.IOException;
|
|
|
import java.util.Collections;
|
|
|
|
|
|
-import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
|
|
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
|
|
+import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
|
|
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
|
|
|
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
|
|
import static org.elasticsearch.xpack.core.transform.TransformField.TRANSFORM_ID;
|
|
@@ -104,6 +101,8 @@ public final class TransformInternalIndex {
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
+ // use getAuditIndexTemplate instead
|
|
|
+ @Deprecated
|
|
|
public static IndexTemplateMetadata getAuditIndexTemplateMetadata() throws IOException {
|
|
|
IndexTemplateMetadata transformTemplate = IndexTemplateMetadata.builder(TransformInternalIndexConstants.AUDIT_INDEX)
|
|
|
.patterns(Collections.singletonList(TransformInternalIndexConstants.AUDIT_INDEX_PREFIX + "*"))
|
|
@@ -121,6 +120,21 @@ public final class TransformInternalIndex {
|
|
|
return transformTemplate;
|
|
|
}
|
|
|
|
|
|
+ public static Template getAuditIndexTemplate() throws IOException {
|
|
|
+ AliasMetadata alias = AliasMetadata.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS).isHidden(true).build();
|
|
|
+
|
|
|
+ return new Template(
|
|
|
+ Settings.builder()
|
|
|
+ // the audits are expected to be small
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
+ .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
|
|
|
+ .put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
|
|
|
+ .build(),
|
|
|
+ new CompressedXContent(Strings.toString(auditMappings())),
|
|
|
+ Collections.singletonMap(alias.alias(), alias)
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
private static XContentBuilder auditMappings() throws IOException {
|
|
|
XContentBuilder builder = jsonBuilder().startObject();
|
|
|
builder.startObject(SINGLE_MAPPING_NAME);
|
|
@@ -365,33 +379,6 @@ public final class TransformInternalIndex {
|
|
|
return builder.startObject("_meta").field("version", Version.CURRENT).endObject();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This method should be called before any document is indexed that relies on the
|
|
|
- * existence of the latest internal index or audit index template.
|
|
|
- *
|
|
|
- * For audit messages the problem is that the standard template upgrader only runs
|
|
|
- * when the master node is upgraded to the newer version. So for the audit index,
|
|
|
- * and also the internal index in the case where the old version doesn't know about
|
|
|
- * system indices, if data nodes are upgraded before master nodes and transforms
|
|
|
- * get assigned to those data nodes then without this check the data nodes will
|
|
|
- * index documents into the internal index before the necessary index template is
|
|
|
- * present and this will result in an index with completely dynamic mappings being
|
|
|
- * created (which is very bad). For the internal index in the case where the old
|
|
|
- * version knows about system indices, if data nodes are upgraded before master
|
|
|
- * nodes and transforms get assigned to those data nodes then without this check
|
|
|
- * the data nodes will index documents into the internal index before the latest
|
|
|
- * system descriptor is present on the master and this will result in an index with
|
|
|
- * the new name but the old mappings.
|
|
|
- */
|
|
|
- public static void ensureLatestIndexAndTemplateInstalled(ClusterService clusterService, Client client, ActionListener<Void> listener) {
|
|
|
-
|
|
|
- createLatestVersionedIndexIfRequired(
|
|
|
- clusterService,
|
|
|
- client,
|
|
|
- ActionListener.wrap(r -> installLatestAuditIndexTemplateIfRequired(clusterService, client, listener), listener::onFailure)
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
protected static boolean hasLatestVersionedIndex(ClusterState state) {
|
|
|
return state.getMetadata().getIndicesLookup().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
|
|
|
}
|
|
@@ -402,10 +389,6 @@ public final class TransformInternalIndex {
|
|
|
return indexRouting != null && indexRouting.allPrimaryShardsActive();
|
|
|
}
|
|
|
|
|
|
- protected static boolean hasLatestAuditIndexTemplate(ClusterState state) {
|
|
|
- return state.getMetadata().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX);
|
|
|
- }
|
|
|
-
|
|
|
private static void waitForLatestVersionedIndexShardsActive(Client client, ActionListener<Void> listener) {
|
|
|
ClusterHealthRequest request = new ClusterHealthRequest(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
|
|
|
// cluster health does not wait for active shards per default
|
|
@@ -420,11 +403,14 @@ public final class TransformInternalIndex {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- protected static void createLatestVersionedIndexIfRequired(
|
|
|
- ClusterService clusterService,
|
|
|
- Client client,
|
|
|
- ActionListener<Void> listener
|
|
|
- ) {
|
|
|
+ /**
|
|
|
+ * This method should be called before any document is indexed that relies on the
|
|
|
+ * existence of the latest internal index.
|
|
|
+ *
|
|
|
+ * Without this check the data nodes will create an internal index with dynamic
|
|
|
+ * mappings when indexing a document, but we want our own well defined mappings.
|
|
|
+ */
|
|
|
+ public static void createLatestVersionedIndexIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {
|
|
|
ClusterState state = clusterService.state();
|
|
|
// The check for existence is against local cluster state, so very cheap
|
|
|
if (hasLatestVersionedIndex(state)) {
|
|
@@ -439,30 +425,28 @@ public final class TransformInternalIndex {
|
|
|
|
|
|
// Creating the index involves communication with the master node, so it's more expensive but much rarer
|
|
|
try {
|
|
|
- CreateIndexRequest request = new CreateIndexRequest(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
|
|
|
- .settings(settings())
|
|
|
+ CreateIndexRequest request = new CreateIndexRequest(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME).settings(
|
|
|
+ settings()
|
|
|
+ )
|
|
|
.mapping(mappings())
|
|
|
.origin(TRANSFORM_ORIGIN)
|
|
|
// explicitly wait for the primary shard (although this might be default)
|
|
|
.waitForActiveShards(ActiveShardCount.ONE);
|
|
|
- ActionListener<CreateIndexResponse> innerListener = ActionListener.wrap(
|
|
|
- r -> listener.onResponse(null),
|
|
|
- e -> {
|
|
|
- // It's not a problem if the index already exists - another node could be running
|
|
|
- // this method at the same time as this one, and also have created the index
|
|
|
- // check if shards are active
|
|
|
- if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
|
|
|
- if (allPrimaryShardsActiveForLatestVersionedIndex(clusterService.state())) {
|
|
|
- listener.onResponse(null);
|
|
|
- return;
|
|
|
- }
|
|
|
- // the index exists but is not ready yet
|
|
|
- waitForLatestVersionedIndexShardsActive(client, listener);
|
|
|
- } else {
|
|
|
- listener.onFailure(e);
|
|
|
+ ActionListener<CreateIndexResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), e -> {
|
|
|
+ // It's not a problem if the index already exists - another node could be running
|
|
|
+ // this method at the same time as this one, and also have created the index
|
|
|
+ // check if shards are active
|
|
|
+ if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
|
|
|
+ if (allPrimaryShardsActiveForLatestVersionedIndex(clusterService.state())) {
|
|
|
+ listener.onResponse(null);
|
|
|
+ return;
|
|
|
}
|
|
|
+ // the index exists but is not ready yet
|
|
|
+ waitForLatestVersionedIndexShardsActive(client, listener);
|
|
|
+ } else {
|
|
|
+ listener.onFailure(e);
|
|
|
}
|
|
|
- );
|
|
|
+ });
|
|
|
executeAsyncWithOrigin(
|
|
|
client.threadPool().getThreadContext(),
|
|
|
TRANSFORM_ORIGIN,
|
|
@@ -475,40 +459,5 @@ public final class TransformInternalIndex {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected static void installLatestAuditIndexTemplateIfRequired(
|
|
|
- ClusterService clusterService,
|
|
|
- Client client,
|
|
|
- ActionListener<Void> listener
|
|
|
- ) {
|
|
|
-
|
|
|
- // The check for existence of the template is against local cluster state, so very cheap
|
|
|
- if (hasLatestAuditIndexTemplate(clusterService.state())) {
|
|
|
- listener.onResponse(null);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Installing the template involves communication with the master node, so it's more expensive but much rarer
|
|
|
- try {
|
|
|
- IndexTemplateMetadata indexTemplateMetadata = getAuditIndexTemplateMetadata();
|
|
|
- BytesReference jsonMappings = indexTemplateMetadata.mappings().uncompressed();
|
|
|
- PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
|
|
|
- indexTemplateMetadata.patterns()
|
|
|
- )
|
|
|
- .version(indexTemplateMetadata.version())
|
|
|
- .settings(indexTemplateMetadata.settings())
|
|
|
- .mapping(XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
|
|
|
- ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
|
|
|
- executeAsyncWithOrigin(
|
|
|
- client.threadPool().getThreadContext(),
|
|
|
- TRANSFORM_ORIGIN,
|
|
|
- request,
|
|
|
- innerListener,
|
|
|
- client.admin().indices()::putTemplate
|
|
|
- );
|
|
|
- } catch (IOException e) {
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private TransformInternalIndex() {}
|
|
|
}
|