|
@@ -9,9 +9,9 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
-import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
|
|
|
-import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
|
|
|
-import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
|
|
|
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
|
|
|
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
|
|
|
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
|
@@ -99,12 +99,12 @@ public final class SchemaUtil {
|
|
|
allFieldNames.putAll(fieldNamesForGrouping);
|
|
|
|
|
|
getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
|
|
|
- ActionListener.wrap(
|
|
|
- sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
|
|
|
- aggregationTypes,
|
|
|
- fieldNamesForGrouping,
|
|
|
- sourceMappings)),
|
|
|
- listener::onFailure));
|
|
|
+ ActionListener.wrap(
|
|
|
+ sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
|
|
|
+ aggregationTypes,
|
|
|
+ fieldNamesForGrouping,
|
|
|
+ sourceMappings)),
|
|
|
+ listener::onFailure));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -118,15 +118,16 @@ public final class SchemaUtil {
|
|
|
public static void getDestinationFieldMappings(final Client client,
|
|
|
final String index,
|
|
|
final ActionListener<Map<String, String>> listener) {
|
|
|
- GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest();
|
|
|
- fieldMappingRequest.indices(index);
|
|
|
- fieldMappingRequest.fields("*");
|
|
|
+ FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
|
|
|
+ .indices(index)
|
|
|
+ .fields("*")
|
|
|
+ .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
|
|
ClientHelper.executeAsyncWithOrigin(client,
|
|
|
ClientHelper.DATA_FRAME_ORIGIN,
|
|
|
- GetFieldMappingsAction.INSTANCE,
|
|
|
- fieldMappingRequest,
|
|
|
+ FieldCapabilitiesAction.INSTANCE,
|
|
|
+ fieldCapabilitiesRequest,
|
|
|
ActionListener.wrap(
|
|
|
- r -> listener.onResponse(extractFieldMappings(r.mappings())),
|
|
|
+ r -> listener.onResponse(extractFieldMappings(r)),
|
|
|
listener::onFailure
|
|
|
));
|
|
|
}
|
|
@@ -143,7 +144,7 @@ public final class SchemaUtil {
|
|
|
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
|
|
|
|
|
|
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
|
|
|
- targetFieldName, aggregationName, destinationMapping);
|
|
|
+ targetFieldName, aggregationName, destinationMapping);
|
|
|
|
|
|
if (Aggregations.isDynamicMapping(destinationMapping)) {
|
|
|
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
|
|
@@ -171,42 +172,25 @@ public final class SchemaUtil {
|
|
|
* Very "magic" helper method to extract the source mappings
|
|
|
*/
|
|
|
private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
|
|
|
- ActionListener<Map<String, String>> listener) {
|
|
|
- GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest();
|
|
|
- fieldMappingRequest.indices(index);
|
|
|
- fieldMappingRequest.fields(fields);
|
|
|
- fieldMappingRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
|
|
-
|
|
|
- client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(
|
|
|
- response -> listener.onResponse(extractFieldMappings(response.mappings())),
|
|
|
+ ActionListener<Map<String, String>> listener) {
|
|
|
+ FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
|
|
|
+ .indices(index)
|
|
|
+ .fields(fields)
|
|
|
+ .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
|
|
+ client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(
|
|
|
+ response -> listener.onResponse(extractFieldMappings(response)),
|
|
|
listener::onFailure));
|
|
|
}
|
|
|
|
|
|
- private static Map<String, String> extractFieldMappings(Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings) {
|
|
|
+ private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
|
|
|
Map<String, String> extractedTypes = new HashMap<>();
|
|
|
|
|
|
- mappings.forEach((indexName, docTypeToMapping) -> {
|
|
|
- // "_doc" ->
|
|
|
- docTypeToMapping.forEach((docType, fieldNameToMapping) -> {
|
|
|
- // "my_field" ->
|
|
|
- fieldNameToMapping.forEach((fieldName, fieldMapping) -> {
|
|
|
- // "mapping" -> "my_field" ->
|
|
|
- fieldMapping.sourceAsMap().forEach((name, typeMap) -> {
|
|
|
- // expected object: { "type": type }
|
|
|
- if (typeMap instanceof Map) {
|
|
|
- final Map<?, ?> map = (Map<?, ?>) typeMap;
|
|
|
- if (map.containsKey("type")) {
|
|
|
- String type = map.get("type").toString();
|
|
|
- if (logger.isTraceEnabled()) {
|
|
|
- logger.trace("Extracted type for [" + fieldName + "] : [" + type + "] from index [" + indexName + "]");
|
|
|
- }
|
|
|
- // TODO: overwrites types, requires resolve if
|
|
|
- // types are mixed
|
|
|
- extractedTypes.put(fieldName, type);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- });
|
|
|
+ response.get().forEach((fieldName, capabilitiesMap) -> {
|
|
|
+ // TODO: overwrites types, requires resolve if
|
|
|
+ // types are mixed
|
|
|
+ capabilitiesMap.forEach((name, capability) -> {
|
|
|
+ logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
|
|
|
+ extractedTypes.put(fieldName, capability.getType());
|
|
|
});
|
|
|
});
|
|
|
return extractedTypes;
|