فهرست منبع

[Connector API] Support cleaning up sync jobs when deleting a connector (#107253)

Jedr Blaszyk 1 سال پیش
والد
کامیت
07aa9cd998
13فایلهای تغییر یافته به همراه289 افزوده شده و 18 حذف شده
  1. 5 0
      docs/changelog/107253.yaml
  2. 7 0
      rest-api-spec/src/main/resources/rest-api-spec/api/connector.delete.json
  3. 1 0
      x-pack/plugin/ent-search/build.gradle
  4. 74 0
      x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/320_connector_delete.yml
  5. 12 5
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java
  6. 24 4
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/DeleteConnectorAction.java
  7. 5 1
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestDeleteConnectorAction.java
  8. 2 1
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportDeleteConnectorAction.java
  9. 38 0
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java
  10. 6 4
      x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorIndexServiceTests.java
  11. 44 0
      x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorTestUtils.java
  12. 2 2
      x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/DeleteConnectorActionRequestBWCSerializingTests.java
  13. 69 1
      x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java

+ 5 - 0
docs/changelog/107253.yaml

@@ -0,0 +1,5 @@
+pr: 107253
+summary: "[Connector API] Support cleaning up sync jobs when deleting a connector"
+area: Application
+type: feature
+issues: []

+ 7 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/connector.delete.json

@@ -26,6 +26,13 @@
           }
         }
       ]
+    },
+    "params": {
+      "delete_sync_jobs": {
+        "type": "boolean",
+        "default": false,
+        "description": "Determines whether associated sync jobs are also deleted."
+      }
     }
   }
 }

+ 1 - 0
x-pack/plugin/ent-search/build.gradle

@@ -30,6 +30,7 @@ dependencies {
   testImplementation(testArtifact(project(xpackModule('core'))))
   testImplementation project(":test:framework")
   testImplementation(project(':modules:lang-mustache'))
+  testImplementation(project(':modules:reindex'))
 
   javaRestTestImplementation(project(path: xpackModule('core')))
   javaRestTestImplementation(testArtifact(project(xpackModule('core'))))

+ 74 - 0
x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/320_connector_delete.yml

@@ -26,6 +26,80 @@ setup:
       connector.get:
         connector_id: test-connector-to-delete
 
+
+---
+"Delete Connector - deletes associated sync jobs":
+
+  - do:
+      connector_sync_job.post:
+        body:
+          id: test-connector-to-delete
+          job_type: full
+          trigger_method: on_demand
+  - do:
+      connector_sync_job.post:
+        body:
+          id: test-connector-to-delete
+          job_type: full
+          trigger_method: on_demand
+  - do:
+      connector_sync_job.post:
+        body:
+          id: test-connector-to-delete
+          job_type: full
+          trigger_method: on_demand
+
+  - do:
+      connector_sync_job.list:
+        connector_id: test-connector-to-delete
+
+  - match: { count: 3 }
+
+  - do:
+      connector.delete:
+        connector_id: test-connector-to-delete
+        delete_sync_jobs: true
+
+  - match: { acknowledged: true }
+
+
+  - do:
+      connector_sync_job.list:
+        connector_id: test-connector-to-delete
+
+  - match: { count: 0 }
+
+
+---
+"Delete Connector - doesn't associated sync jobs when delete_sync_jobs is false":
+
+  - do:
+      connector_sync_job.post:
+        body:
+          id: test-connector-to-delete
+          job_type: full
+          trigger_method: on_demand
+
+  - do:
+      connector_sync_job.list:
+        connector_id: test-connector-to-delete
+
+  - match: { count: 1 }
+
+  - do:
+      connector.delete:
+        connector_id: test-connector-to-delete
+        delete_sync_jobs: false
+
+  - match: { acknowledged: true }
+
+
+  - do:
+      connector_sync_job.list:
+        connector_id: test-connector-to-delete
+
+  - match: { count: 1 }
+
 ---
 "Delete Connector - Connector does not exist":
   - do:

+ 12 - 5
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java

@@ -54,6 +54,8 @@ import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipel
 import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
 import org.elasticsearch.xpack.application.connector.action.UpdateConnectorServiceTypeAction;
 import org.elasticsearch.xpack.application.connector.action.UpdateConnectorStatusAction;
+import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
+import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService;
 
 import java.time.Instant;
 import java.util.Arrays;
@@ -253,12 +255,13 @@ public class ConnectorIndexService {
     }
 
     /**
-     * Deletes the {@link Connector} in the underlying index.
+     * Deletes the {@link Connector} and the related instances of {@link ConnectorSyncJob} in the underlying index.
      *
-     * @param connectorId The id of the connector object.
-     * @param listener    The action listener to invoke on response/failure.
+     * @param connectorId          The id of the {@link Connector}.
+     * @param shouldDeleteSyncJobs The flag indicating if {@link ConnectorSyncJob} should also be deleted.
+     * @param listener             The action listener to invoke on response/failure.
      */
-    public void deleteConnector(String connectorId, ActionListener<DeleteResponse> listener) {
+    public void deleteConnector(String connectorId, boolean shouldDeleteSyncJobs, ActionListener<DeleteResponse> listener) {
 
         final DeleteRequest deleteRequest = new DeleteRequest(CONNECTOR_INDEX_NAME).id(connectorId)
             .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@@ -269,7 +272,11 @@ public class ConnectorIndexService {
                     l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
                     return;
                 }
-                l.onResponse(deleteResponse);
+                if (shouldDeleteSyncJobs) {
+                    new ConnectorSyncJobIndexService(client).deleteAllSyncJobsByConnectorId(connectorId, l.map(r -> deleteResponse));
+                } else {
+                    l.onResponse(deleteResponse);
+                }
             }));
         } catch (Exception e) {
             listener.onFailure(e);

+ 24 - 4
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/DeleteConnectorAction.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -35,16 +36,20 @@ public class DeleteConnectorAction {
     public static class Request extends ConnectorActionRequest implements ToXContentObject {
 
         private final String connectorId;
+        private final boolean deleteSyncJobs;
 
         private static final ParseField CONNECTOR_ID_FIELD = new ParseField("connector_id");
+        private static final ParseField DELETE_SYNC_JOB_FIELD = new ParseField("delete_sync_jobs");
 
         public Request(StreamInput in) throws IOException {
             super(in);
             this.connectorId = in.readString();
+            this.deleteSyncJobs = in.readBoolean();
         }
 
-        public Request(String connectorId) {
+        public Request(String connectorId, boolean deleteSyncJobs) {
             this.connectorId = connectorId;
+            this.deleteSyncJobs = deleteSyncJobs;
         }
 
         @Override
@@ -62,10 +67,23 @@ public class DeleteConnectorAction {
             return connectorId;
         }
 
+        public boolean shouldDeleteSyncJobs() {
+            return deleteSyncJobs;
+        }
+
+        @Override
+        public String[] indices() {
+            // When deleting a connector, corresponding sync jobs can also be deleted
+            return new String[] {
+                ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN,
+                ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(connectorId);
+            out.writeBoolean(deleteSyncJobs);
         }
 
         @Override
@@ -73,18 +91,19 @@ public class DeleteConnectorAction {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             Request request = (Request) o;
-            return Objects.equals(connectorId, request.connectorId);
+            return deleteSyncJobs == request.deleteSyncJobs && Objects.equals(connectorId, request.connectorId);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(connectorId);
+            return Objects.hash(connectorId, deleteSyncJobs);
         }
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
             builder.field(CONNECTOR_ID_FIELD.getPreferredName(), connectorId);
+            builder.field(DELETE_SYNC_JOB_FIELD.getPreferredName(), deleteSyncJobs);
             builder.endObject();
             return builder;
         }
@@ -92,10 +111,11 @@ public class DeleteConnectorAction {
         private static final ConstructingObjectParser<DeleteConnectorAction.Request, Void> PARSER = new ConstructingObjectParser<>(
             "delete_connector_request",
             false,
-            (p) -> new Request((String) p[0])
+            (p) -> new Request((String) p[0], (boolean) p[1])
         );
         static {
             PARSER.declareString(constructorArg(), CONNECTOR_ID_FIELD);
+            PARSER.declareBoolean(constructorArg(), DELETE_SYNC_JOB_FIELD);
         }
 
         public static DeleteConnectorAction.Request parse(XContentParser parser) {

+ 5 - 1
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestDeleteConnectorAction.java

@@ -35,7 +35,11 @@ public class RestDeleteConnectorAction extends BaseRestHandler {
 
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
-        DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(restRequest.param("connector_id"));
+
+        String connectorId = restRequest.param("connector_id");
+        boolean shouldDeleteSyncJobs = restRequest.paramAsBoolean("delete_sync_jobs", false);
+
+        DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(connectorId, shouldDeleteSyncJobs);
         return channel -> client.execute(DeleteConnectorAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 2 - 1
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportDeleteConnectorAction.java

@@ -43,6 +43,7 @@ public class TransportDeleteConnectorAction extends HandledTransportAction<Delet
     @Override
     protected void doExecute(Task task, DeleteConnectorAction.Request request, ActionListener<AcknowledgedResponse> listener) {
         String connectorId = request.getConnectorId();
-        connectorIndexService.deleteConnector(connectorId, listener.map(v -> AcknowledgedResponse.TRUE));
+        boolean shouldDeleteSyncJobs = request.shouldDeleteSyncJobs();
+        connectorIndexService.deleteConnector(connectorId, shouldDeleteSyncJobs, listener.map(v -> AcknowledgedResponse.TRUE));
     }
 }

+ 38 - 0
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.application.connector.syncjob;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
@@ -14,6 +15,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DelegatingActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetRequest;
@@ -21,6 +23,7 @@ import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateResponse;
@@ -33,6 +36,9 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.index.query.TermsQueryBuilder;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryAction;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -58,6 +64,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
@@ -586,6 +593,37 @@ public class ConnectorSyncJobIndexService {
         }
     }
 
+    /**
+     * Deletes all {@link ConnectorSyncJob} documents that match a specific {@link Connector} id in the underlying index.
+     * Gracefully handles non-existent sync job index.
+     *
+     * @param connectorId The id of the {@link Connector} to match in the sync job documents.
+     * @param listener    The action listener to invoke on response/failure.
+     */
+    public void deleteAllSyncJobsByConnectorId(String connectorId, ActionListener<BulkByScrollResponse> listener) {
+        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).setQuery(
+            new TermQueryBuilder(
+                ConnectorSyncJob.CONNECTOR_FIELD.getPreferredName() + "." + Connector.ID_FIELD.getPreferredName(),
+                connectorId
+            )
+        ).setRefresh(true).setIndicesOptions(IndicesOptions.fromOptions(true, true, false, false));
+
+        client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, r) -> {
+            final List<BulkItemResponse.Failure> bulkDeleteFailures = r.getBulkFailures();
+            if (bulkDeleteFailures.isEmpty() == false) {
+                l.onFailure(
+                    new ElasticsearchException(
+                        "Error deleting sync jobs associated with connector ["
+                            + connectorId
+                            + "] "
+                            + bulkDeleteFailures.stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.joining("\n"))
+                    )
+                );
+            }
+            l.onResponse(r);
+        }));
+    }
+
     /**
      * Listeners that checks failures for IndexNotFoundException and DocumentMissingException,
      * and transforms them in ResourceNotFoundException, invoking onFailure on the delegate listener.

+ 6 - 4
x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorIndexServiceTests.java

@@ -55,6 +55,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.getRandomCronExpression;
+import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.registerSimplifiedConnectorIndexTemplates;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.equalTo;
 
@@ -66,6 +67,7 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
 
     @Before
     public void setup() {
+        registerSimplifiedConnectorIndexTemplates(indicesAdmin());
         this.connectorIndexService = new ConnectorIndexService(client());
     }
 
@@ -104,11 +106,11 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
         }
 
         String connectorIdToDelete = connectorIds.get(0);
-        DeleteResponse resp = awaitDeleteConnector(connectorIdToDelete);
+        DeleteResponse resp = awaitDeleteConnector(connectorIdToDelete, false);
         assertThat(resp.status(), equalTo(RestStatus.OK));
         expectThrows(ResourceNotFoundException.class, () -> awaitGetConnector(connectorIdToDelete));
 
-        expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete));
+        expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete, false));
     }
 
     public void testUpdateConnectorConfiguration_FullConfiguration() throws Exception {
@@ -526,11 +528,11 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
         assertThat(updateApiKeyIdRequest.getApiKeySecretId(), equalTo(indexedConnector.getApiKeySecretId()));
     }
 
-    private DeleteResponse awaitDeleteConnector(String connectorId) throws Exception {
+    private DeleteResponse awaitDeleteConnector(String connectorId, boolean deleteConnectorSyncJobs) throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<DeleteResponse> resp = new AtomicReference<>(null);
         final AtomicReference<Exception> exc = new AtomicReference<>(null);
-        connectorIndexService.deleteConnector(connectorId, new ActionListener<>() {
+        connectorIndexService.deleteConnector(connectorId, deleteConnectorSyncJobs, new ActionListener<>() {
             @Override
             public void onResponse(DeleteResponse deleteResponse) {
                 resp.set(deleteResponse);

+ 44 - 0
x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorTestUtils.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.application.connector;
 
+import org.elasticsearch.client.internal.IndicesAdminClient;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.xcontent.XContentType;
@@ -25,6 +26,7 @@ import org.elasticsearch.xpack.application.connector.filtering.FilteringRuleCond
 import org.elasticsearch.xpack.application.connector.filtering.FilteringRules;
 import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationInfo;
 import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationState;
+import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
 import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobType;
 import org.elasticsearch.xpack.core.scheduler.Cron;
 
@@ -46,11 +48,53 @@ import static org.elasticsearch.test.ESTestCase.randomInt;
 import static org.elasticsearch.test.ESTestCase.randomList;
 import static org.elasticsearch.test.ESTestCase.randomLong;
 import static org.elasticsearch.test.ESTestCase.randomLongBetween;
+import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN;
+import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN;
+import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME;
+import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME;
 
 public final class ConnectorTestUtils {
 
     public static final String NULL_STRING = null;
 
+    /**
+     * Registers index templates for instances of {@link Connector} and {@link ConnectorSyncJob} with essential field mappings. This method
+     * only includes mappings for fields relevant to test cases, specifying field types to ensure correct ES query logic behavior.
+     *
+     * @param indicesAdminClient The Elasticsearch indices admin client used for template registration.
+     */
+
+    public static void registerSimplifiedConnectorIndexTemplates(IndicesAdminClient indicesAdminClient) {
+
+        indicesAdminClient.preparePutTemplate(CONNECTOR_TEMPLATE_NAME)
+            .setPatterns(List.of(CONNECTOR_INDEX_NAME_PATTERN))
+            .setVersion(0)
+            .setMapping(
+                "service_type",
+                "type=keyword,store=true",
+                "status",
+                "type=keyword,store=true",
+                "index_name",
+                "type=keyword,store=true",
+                "configuration",
+                "type=object"
+            )
+            .get();
+
+        indicesAdminClient.preparePutTemplate(CONNECTOR_SYNC_JOBS_TEMPLATE_NAME)
+            .setPatterns(List.of(CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN))
+            .setVersion(0)
+            .setMapping(
+                "job_type",
+                "type=keyword,store=true",
+                "connector.id",
+                "type=keyword,store=true",
+                "status",
+                "type=keyword,store=true"
+            )
+            .get();
+    }
+
     public static PutConnectorAction.Request getRandomPutConnectorActionRequest() {
         return new PutConnectorAction.Request(
             randomAlphaOfLengthBetween(5, 15),

+ 2 - 2
x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/DeleteConnectorActionRequestBWCSerializingTests.java

@@ -23,7 +23,7 @@ public class DeleteConnectorActionRequestBWCSerializingTests extends AbstractBWC
 
     @Override
     protected DeleteConnectorAction.Request createTestInstance() {
-        return new DeleteConnectorAction.Request(randomAlphaOfLengthBetween(1, 10));
+        return new DeleteConnectorAction.Request(randomAlphaOfLengthBetween(1, 10), false);
     }
 
     @Override
@@ -38,6 +38,6 @@ public class DeleteConnectorActionRequestBWCSerializingTests extends AbstractBWC
 
     @Override
     protected DeleteConnectorAction.Request mutateInstanceForVersion(DeleteConnectorAction.Request instance, TransportVersion version) {
-        return new DeleteConnectorAction.Request(instance.getConnectorId());
+        return new DeleteConnectorAction.Request(instance.getConnectorId(), instance.shouldDeleteSyncJobs());
     }
 }

+ 69 - 1
x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java

@@ -20,6 +20,9 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.reindex.ReindexPlugin;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.xcontent.ParseField;
@@ -40,6 +43,7 @@ import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.registerSimplifiedConnectorIndexTemplates;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -73,9 +78,19 @@ public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase {
     private String connectorTwoId;
     private String connectorThreeId;
 
+    @Override
+    protected Collection<Class<? extends Plugin>> getPlugins() {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
+        // Reindex plugin is required for testDeleteAllSyncJobsByConnectorId (supports delete_by_query)
+        plugins.add(ReindexPlugin.class);
+        return plugins;
+    }
+
     @Before
     public void setup() throws Exception {
 
+        registerSimplifiedConnectorIndexTemplates(indicesAdmin());
+
         connectorOneId = createConnector(ConnectorTestUtils.getRandomConnector());
         connectorTwoId = createConnector(ConnectorTestUtils.getRandomConnector());
         connectorThreeId = createConnector(ConnectorTestUtils.getRandomConnectorWithDetachedIndex());
@@ -188,6 +203,35 @@ public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase {
         expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnectorSyncJob(NON_EXISTING_SYNC_JOB_ID));
     }
 
+    public void testDeleteAllSyncJobsByConnectorId() throws Exception {
+
+        PostConnectorSyncJobAction.Request syncJobRequest = new PostConnectorSyncJobAction.Request(
+            connectorOneId,
+            ConnectorSyncJobType.FULL,
+            ConnectorSyncJobTriggerMethod.ON_DEMAND
+        );
+
+        int numJobs = 5;
+        // Create 5 jobs associated with connector
+        for (int i = 0; i < numJobs; i++) {
+            awaitPutConnectorSyncJob(syncJobRequest);
+        }
+
+        BulkByScrollResponse response = awaitDeleteAllSyncJobsByConnectorId(connectorOneId);
+        // 5 jobs should be deleted
+        assertEquals(numJobs, response.getDeleted());
+
+        response = awaitDeleteAllSyncJobsByConnectorId(connectorOneId);
+        // No jobs should be deleted
+        assertEquals(0, response.getDeleted());
+    }
+
+    public void testDeleteAllSyncJobsByConnectorId_NonExistentConnector() throws Exception {
+        BulkByScrollResponse response = awaitDeleteAllSyncJobsByConnectorId("non-existent-connector");
+        // 0 jobs should be deleted
+        assertEquals(0, response.getDeleted());
+    }
+
     public void testGetConnectorSyncJob() throws Exception {
         PostConnectorSyncJobAction.Request syncJobRequest = ConnectorSyncJobTestUtils.getRandomPostConnectorSyncJobActionRequest(
             connectorOneId
@@ -492,7 +536,6 @@ public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase {
         assertThat(idOfReturnedSyncJob, equalTo(syncJobOneId));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/enterprise-search-team/issues/6351")
     public void testListConnectorSyncJobs_WithConnectorOneId_GivenTwoOverallOneFromConnectorOne_ExpectOne() throws Exception {
         PostConnectorSyncJobAction.Request requestOne = ConnectorSyncJobTestUtils.getRandomPostConnectorSyncJobActionRequest(
             connectorOneId
@@ -1129,6 +1172,31 @@ public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase {
         return resp.get();
     }
 
+    private BulkByScrollResponse awaitDeleteAllSyncJobsByConnectorId(String connectorSyncJobId) throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<BulkByScrollResponse> resp = new AtomicReference<>(null);
+        final AtomicReference<Exception> exc = new AtomicReference<>(null);
+        connectorSyncJobIndexService.deleteAllSyncJobsByConnectorId(connectorSyncJobId, new ActionListener<>() {
+            @Override
+            public void onResponse(BulkByScrollResponse deleteResponse) {
+                resp.set(deleteResponse);
+                latch.countDown();
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                exc.set(e);
+                latch.countDown();
+            }
+        });
+        assertTrue("Timeout waiting for delete request", latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        if (exc.get() != null) {
+            throw exc.get();
+        }
+        assertNotNull("Received null response from delete request", resp.get());
+        return resp.get();
+    }
+
     private PostConnectorSyncJobAction.Response awaitPutConnectorSyncJob(PostConnectorSyncJobAction.Request syncJobRequest)
         throws Exception {
         CountDownLatch latch = new CountDownLatch(1);