|
@@ -63,7 +63,9 @@ import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.Set;
|
|
|
import java.util.function.BiConsumer;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
|
|
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
|
|
@@ -382,51 +384,114 @@ public class ConnectorIndexService {
|
|
|
|
|
|
/**
|
|
|
* Updates the {@link ConnectorConfiguration} property of a {@link Connector}.
|
|
|
- * The update process is non-additive; it completely replaces all existing configuration fields with the new configuration mapping,
|
|
|
- * thereby deleting any old configurations.
|
|
|
+ * This method supports full configuration replacement or individual configuration value updates.
|
|
|
+ * If a full configuration is provided, it overwrites all existing configurations in non-additive way.
|
|
|
+ * If only configuration values are provided, the existing {@link ConnectorConfiguration} is updated with new values
|
|
|
+ * provided in the request.
|
|
|
*
|
|
|
* @param request Request for updating connector configuration property.
|
|
|
* @param listener Listener to respond to a successful response or an error.
|
|
|
*/
|
|
|
public void updateConnectorConfiguration(UpdateConnectorConfigurationAction.Request request, ActionListener<UpdateResponse> listener) {
|
|
|
try {
|
|
|
+ Map<String, ConnectorConfiguration> fullConfiguration = request.getConfiguration();
|
|
|
+ Map<String, Object> configurationValues = request.getConfigurationValues();
|
|
|
String connectorId = request.getConnectorId();
|
|
|
|
|
|
- String updateConfigurationScript = String.format(
|
|
|
- Locale.ROOT,
|
|
|
- """
|
|
|
- ctx._source.%s = params.%s;
|
|
|
- ctx._source.%s = params.%s;
|
|
|
- """,
|
|
|
- Connector.CONFIGURATION_FIELD.getPreferredName(),
|
|
|
- Connector.CONFIGURATION_FIELD.getPreferredName(),
|
|
|
- Connector.STATUS_FIELD.getPreferredName(),
|
|
|
- Connector.STATUS_FIELD.getPreferredName()
|
|
|
- );
|
|
|
- Script script = new Script(
|
|
|
- ScriptType.INLINE,
|
|
|
- "painless",
|
|
|
- updateConfigurationScript,
|
|
|
- Map.of(
|
|
|
- Connector.CONFIGURATION_FIELD.getPreferredName(),
|
|
|
- request.getConfiguration(),
|
|
|
- Connector.STATUS_FIELD.getPreferredName(),
|
|
|
- ConnectorStatus.CONFIGURED.toString()
|
|
|
- )
|
|
|
- );
|
|
|
- final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).script(script)
|
|
|
- .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+ getConnector(connectorId, listener.delegateFailure((l, connector) -> {
|
|
|
|
|
|
- clientWithOrigin.update(
|
|
|
- updateRequest,
|
|
|
- new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
|
|
|
+ UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).setRefreshPolicy(
|
|
|
+ WriteRequest.RefreshPolicy.IMMEDIATE
|
|
|
+ );
|
|
|
+
|
|
|
+ // Completely override [configuration] field with script
|
|
|
+ if (fullConfiguration != null) {
|
|
|
+ String updateConfigurationScript = String.format(
|
|
|
+ Locale.ROOT,
|
|
|
+ """
|
|
|
+ ctx._source.%s = params.%s;
|
|
|
+ ctx._source.%s = params.%s;
|
|
|
+ """,
|
|
|
+ Connector.CONFIGURATION_FIELD.getPreferredName(),
|
|
|
+ Connector.CONFIGURATION_FIELD.getPreferredName(),
|
|
|
+ Connector.STATUS_FIELD.getPreferredName(),
|
|
|
+ Connector.STATUS_FIELD.getPreferredName()
|
|
|
+ );
|
|
|
+ Script script = new Script(
|
|
|
+ ScriptType.INLINE,
|
|
|
+ "painless",
|
|
|
+ updateConfigurationScript,
|
|
|
+ Map.of(
|
|
|
+ Connector.CONFIGURATION_FIELD.getPreferredName(),
|
|
|
+ request.getConfiguration(),
|
|
|
+ Connector.STATUS_FIELD.getPreferredName(),
|
|
|
+ ConnectorStatus.CONFIGURED.toString()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ updateRequest = updateRequest.script(script);
|
|
|
+
|
|
|
+ }
|
|
|
+ // Only update configuration values for (key, value) pairs provided
|
|
|
+ else if (configurationValues != null) {
|
|
|
+
|
|
|
+ Set<String> existingKeys = getConnectorConfigurationFromSearchResult(connector).keySet();
|
|
|
+ Set<String> newConfigurationKeys = configurationValues.keySet();
|
|
|
+
|
|
|
+ // Fail request it could result in updating values for unknown configuration keys
|
|
|
+ if (existingKeys.containsAll(newConfigurationKeys) == false) {
|
|
|
+
|
|
|
+ Set<String> unknownConfigKeys = newConfigurationKeys.stream()
|
|
|
+ .filter(key -> existingKeys.contains(key) == false)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+
|
|
|
+ l.onFailure(
|
|
|
+ new ElasticsearchStatusException(
|
|
|
+ "Unknown [configuration] fields in the request payload: ["
|
|
|
+ + String.join(", ", unknownConfigKeys)
|
|
|
+ + "]. Remove them from request or register their schema first.",
|
|
|
+ RestStatus.BAD_REQUEST
|
|
|
+ )
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Object> configurationValuesUpdatePayload = configurationValues.entrySet()
|
|
|
+ .stream()
|
|
|
+ .collect(
|
|
|
+ Collectors.toMap(
|
|
|
+ Map.Entry::getKey,
|
|
|
+ entry -> Map.of(ConnectorConfiguration.VALUE_FIELD.getPreferredName(), entry.getValue())
|
|
|
+ )
|
|
|
+ );
|
|
|
+
|
|
|
+ updateRequest = updateRequest.doc(
|
|
|
+ new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
|
|
|
+ .id(connectorId)
|
|
|
+ .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
+ .source(
|
|
|
+ Map.of(
|
|
|
+ Connector.CONFIGURATION_FIELD.getPreferredName(),
|
|
|
+ configurationValuesUpdatePayload,
|
|
|
+ Connector.STATUS_FIELD.getPreferredName(),
|
|
|
+ ConnectorStatus.CONFIGURED.toString()
|
|
|
+ )
|
|
|
+ )
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ l.onFailure(
|
|
|
+ new ElasticsearchStatusException("[configuration] and [values] cannot both be null.", RestStatus.BAD_REQUEST)
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
|
|
|
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
|
|
|
- l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
|
|
|
+ ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
|
|
|
return;
|
|
|
}
|
|
|
- l.onResponse(updateResponse);
|
|
|
- })
|
|
|
- );
|
|
|
+ ll.onResponse(updateResponse);
|
|
|
+ }));
|
|
|
+ }));
|
|
|
} catch (Exception e) {
|
|
|
listener.onFailure(e);
|
|
|
}
|
|
@@ -851,6 +916,11 @@ public class ConnectorIndexService {
|
|
|
return ConnectorStatus.connectorStatus((String) searchResult.getResultMap().get(Connector.STATUS_FIELD.getPreferredName()));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private Map<String, Object> getConnectorConfigurationFromSearchResult(ConnectorSearchResult searchResult) {
|
|
|
+ return (Map<String, Object>) searchResult.getResultMap().get(Connector.CONFIGURATION_FIELD.getPreferredName());
|
|
|
+ }
|
|
|
+
|
|
|
private static ConnectorIndexService.ConnectorResult mapSearchResponseToConnectorList(SearchResponse response) {
|
|
|
final List<ConnectorSearchResult> connectorResults = Arrays.stream(response.getHits().getHits())
|
|
|
.map(ConnectorIndexService::hitToConnector)
|