Sfoglia il codice sorgente

Remove dead code from o.e.a.bulk (#89987)

Just some random finds while researching other things.
Armin Braun 3 anni fa
parent
commit
2c4731e900

+ 0 - 4
server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java

@@ -341,10 +341,6 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
             return builder;
         }
 
-        public static Failure fromXContent(XContentParser parser) {
-            return PARSER.apply(parser, null);
-        }
-
         @Override
         public String toString() {
             return Strings.toString(this);

+ 6 - 33
server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java

@@ -41,11 +41,6 @@ class BulkPrimaryExecutionContext {
          * submitted
          */
         WAIT_FOR_MAPPING_UPDATE,
-        /**
-         * The request should be executed again, but there is no need to wait for an external event.
-         * This is needed to support retry on conflicts during updates.
-         */
-        IMMEDIATE_RETRY,
         /** The request has been executed on the primary shard (successfully or not) */
         EXECUTED,
         /**
@@ -115,29 +110,11 @@ class BulkPrimaryExecutionContext {
         return retryCounter;
     }
 
-    /** returns true if the current request has been executed on the primary */
-    public boolean isOperationExecuted() {
-        return currentItemState == ItemProcessingState.EXECUTED;
-    }
-
     /** returns true if the request needs to wait for a mapping update to arrive from the master */
     public boolean requiresWaitingForMappingUpdate() {
         return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE;
     }
 
-    /** returns true if the current request should be retried without waiting for an external event */
-    public boolean requiresImmediateRetry() {
-        return currentItemState == ItemProcessingState.IMMEDIATE_RETRY;
-    }
-
-    /**
-     * returns true if the current request has been completed and it's result translated to a user
-     * facing response
-     */
-    public boolean isCompleted() {
-        return currentItemState == ItemProcessingState.COMPLETED;
-    }
-
     /**
      * returns true if the current request is in INITIAL state
      */
@@ -203,19 +180,19 @@ class BulkPrimaryExecutionContext {
 
     /** resets the current item state, prepare for a new execution */
     public void resetForExecutionForRetry() {
-        assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED);
+        assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED);
         currentItemState = ItemProcessingState.INITIAL;
         requestToExecute = null;
         executionResult = null;
-        assertInvariants(ItemProcessingState.INITIAL);
+        assert assertInvariants(ItemProcessingState.INITIAL);
     }
 
     /** completes the operation without doing anything on the primary */
     public void markOperationAsNoOp(DocWriteResponse response) {
-        assertInvariants(ItemProcessingState.INITIAL);
+        assert assertInvariants(ItemProcessingState.INITIAL);
         executionResult = BulkItemResponse.success(getCurrentItem().id(), getCurrentItem().request().opType(), response);
         currentItemState = ItemProcessingState.EXECUTED;
-        assertInvariants(ItemProcessingState.EXECUTED);
+        assert assertInvariants(ItemProcessingState.EXECUTED);
     }
 
     /** indicates that the operation needs to be failed as the required mapping didn't arrive in time */
@@ -235,7 +212,7 @@ class BulkPrimaryExecutionContext {
 
     /** the current operation has been executed on the primary with the specified result */
     public void markOperationAsExecuted(Engine.Result result) {
-        assertInvariants(ItemProcessingState.TRANSLATED);
+        assert assertInvariants(ItemProcessingState.TRANSLATED);
         final BulkItemRequest current = getCurrentItem();
         DocWriteRequest<?> docWriteRequest = getRequestToExecute();
         switch (result.getResultType()) {
@@ -290,7 +267,7 @@ class BulkPrimaryExecutionContext {
 
     /** finishes the execution of the current request, with the response that should be returned to the user */
     public void markAsCompleted(BulkItemResponse translatedResponse) {
-        assertInvariants(ItemProcessingState.EXECUTED);
+        assert assertInvariants(ItemProcessingState.EXECUTED);
         assert executionResult != null && translatedResponse.getItemId() == executionResult.getItemId();
         assert translatedResponse.getItemId() == getCurrentItem().id();
 
@@ -329,10 +306,6 @@ class BulkPrimaryExecutionContext {
                 assert requestToExecute == null;
                 assert executionResult == null : executionResult;
                 break;
-            case IMMEDIATE_RETRY:
-                assert requestToExecute != null;
-                assert executionResult == null : executionResult;
-                break;
             case EXECUTED:
                 // requestToExecute can be null if the update ended up as NOOP
                 assert executionResult != null;

+ 1 - 43
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

@@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -204,47 +203,6 @@ public class BulkProcessor implements Closeable {
         }
     }
 
-    /**
-     * @param client The client that executes the bulk operations
-     * @param listener The BulkProcessor listener that gets called on bulk events
-     * @param flushScheduler The scheduler that is used to flush
-     * @param retryScheduler The scheduler that is used for retries
-     * @param onClose The runnable instance that is executed on close. Consumers are required to clean up the schedulers.
-     * @return the builder for BulkProcessor
-     */
-    public static Builder builder(Client client, Listener listener, Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) {
-        Objects.requireNonNull(client, "client");
-        Objects.requireNonNull(listener, "listener");
-        return new Builder(client::bulk, listener, flushScheduler, retryScheduler, onClose);
-    }
-
-    /**
-     * @param client The client that executes the bulk operations
-     * @param listener The BulkProcessor listener that gets called on bulk events
-     * @return the builder for BulkProcessor
-     * @deprecated Use {@link #builder(BiConsumer, Listener, String)}
-     * with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.internal.Client,
-     * org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler,
-     * org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly
-     */
-    @Deprecated
-    public static Builder builder(Client client, Listener listener) {
-        Objects.requireNonNull(client, "client");
-        Objects.requireNonNull(listener, "listener");
-        return new Builder(client::bulk, listener, client.threadPool(), client.threadPool(), () -> {});
-    }
-
-    /**
-     * @param consumer The consumer that is called to fulfil bulk operations
-     * @param listener The BulkProcessor listener that gets called on bulk events
-     * @return the builder for BulkProcessor
-     * @deprecated use {@link #builder(BiConsumer, Listener, String)} instead
-     */
-    @Deprecated
-    public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
-        return builder(consumer, listener, "anonymous-bulk-processor");
-    }
-
     /**
      * @param consumer The consumer that is called to fulfil bulk operations
      * @param listener The BulkProcessor listener that gets called on bulk events
@@ -277,7 +235,7 @@ public class BulkProcessor implements Closeable {
 
     private BulkRequest bulkRequest;
     private final Supplier<BulkRequest> bulkRequestSupplier;
-    private Supplier<Boolean> flushSupplier;
+    private final Supplier<Boolean> flushSupplier;
     private final BulkRequestHandler bulkRequestHandler;
     private final Runnable onClose;
 

+ 2 - 2
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -278,7 +278,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         } else {
             final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
             for (String index : autoCreateIndices) {
-                createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<>() {
+                createIndex(index, bulkRequest.timeout(), new ActionListener<>() {
                     @Override
                     public void onResponse(CreateIndexResponse result) {
                         if (counter.decrementAndGet() == 0) {
@@ -427,7 +427,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
     }
 
-    void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
+    void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
         CreateIndexRequest createIndexRequest = new CreateIndexRequest();
         createIndexRequest.index(index);
         createIndexRequest.cause("auto(bulk api)");

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

@@ -147,7 +147,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
             }
 
             @Override
-            void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
+            void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
                 try {
                     simulateAutoCreate.accept(index);
                     // If we try to create an index just immediately assume it worked

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -155,7 +155,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         }
 
         @Override
-        void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
+        void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
             indexCreated = true;
             listener.onResponse(null);
         }

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -89,7 +89,7 @@ public class TransportBulkActionTests extends ESTestCase {
         }
 
         @Override
-        void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
+        void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
             indexCreated = true;
             if (beforeIndexCreation != null) {
                 beforeIndexCreation.run();