Browse Source

Reject illegal flush parameters (#40213)

This change rejects an illegal combination of flush parameters where
force is true, but wait_if_ongoing is false. This combination is trappy
and should be forbidden.

Closes #36342
Nhat Nguyen 6 years ago
parent
commit
fe03d24229

+ 2 - 4
docs/reference/indices/flush.asciidoc

@@ -23,10 +23,8 @@ POST twitter/_flush
 The flush API accepts the following request parameters:
 
 [horizontal]
-`wait_if_ongoing`::  If set to `true` the flush operation will block until the
-flush can be executed if another flush operation is already executing.
-The default is `false` and will cause an exception to be thrown on
-the shard level if another flush operation is already running.
+`wait_if_ongoing`::  If set to `true`(the default value) the flush operation will
+block until the flush can be executed if another flush operation is already executing.
 
 `force`:: Whether a flush should be forced even if it is not necessarily needed i.e.
 if no changes will be committed to the index. This is useful if transaction log IDs

+ 29 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml

@@ -52,3 +52,32 @@
   # periodic flush is async
   - gte: { indices.test.primaries.flush.periodic: 0 }
   - gte: { indices.test.primaries.flush.total:    1 }
+
+---
+"Flush parameters validation":
+  - skip:
+      version: " - 7.9.99"
+      reason: flush parameters validation is introduced in 8.0
+  - do:
+      indices.create:
+        index: test
+        body:
+          settings:
+            number_of_shards: 1
+  - do:
+      catch:  /action_request_validation_exception.+ wait_if_ongoing must be true for a force flush/
+      indices.flush:
+        index: test
+        force: true
+        wait_if_ongoing: false
+  - do:
+      indices.stats: { index: test }
+  - match: { indices.test.primaries.flush.total:    0 }
+  - do:
+      indices.flush:
+        index: test
+        force: true
+        wait_if_ongoing: true
+  - do:
+      indices.stats: { index: test }
+  - match: { indices.test.primaries.flush.total:    1 }

+ 12 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java

@@ -19,12 +19,15 @@
 
 package org.elasticsearch.action.admin.indices.flush;
 
+import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.support.broadcast.BroadcastRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 
 import java.io.IOException;
 
+import static org.elasticsearch.action.ValidateActions.addValidationError;
+
 /**
  * A flush request to flush one or more indices. The flush process of an index basically frees memory from the index
  * by flushing data to the index storage and clearing the internal transaction log. By default, Elasticsearch uses
@@ -82,6 +85,15 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
         return this;
     }
 
+    @Override
+    public ActionRequestValidationException validate() {
+        ActionRequestValidationException validationError = super.validate();
+        if (force && waitIfOngoing == false) {
+            validationError = addValidationError("wait_if_ongoing must be true for a force flush", validationError);
+        }
+        return validationError;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);

+ 5 - 0
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -1684,6 +1684,11 @@ public class InternalEngine extends Engine {
     @Override
     public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
         ensureOpen();
+        if (force && waitIfOngoing == false) {
+            assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
+            throw new IllegalArgumentException(
+                "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing);
+        }
         final byte[] newCommitId;
         /*
          * Unfortunately the lock order is important here. We have to acquire the readlock first otherwise

+ 2 - 2
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -4866,7 +4866,7 @@ public class InternalEngineTests extends EngineTestCase {
             for (int docId = 0; docId < numDocs; docId++) {
                 index(engine, docId);
                 if (rarely()) {
-                    engine.flush(randomBoolean(), randomBoolean());
+                    engine.flush(randomBoolean(), true);
                 }
             }
             engine.flush(false, randomBoolean());
@@ -4892,7 +4892,7 @@ public class InternalEngineTests extends EngineTestCase {
             for (int docId = 0; docId < numDocs; docId++) {
                 index(engine, docId);
                 if (frequently()) {
-                    engine.flush(randomBoolean(), randomBoolean());
+                    engine.flush(randomBoolean(), true);
                 }
             }
             engine.flush(false, randomBoolean());

+ 2 - 2
server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

@@ -131,8 +131,8 @@ public class ReadOnlyEngineTests extends EngineTestCase {
                 engine.syncTranslog();
                 engine.flushAndClose();
                 readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity());
-                Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), randomBoolean());
-                assertEquals(flush, readOnlyEngine.flush(randomBoolean(), randomBoolean()));
+                Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), true);
+                assertEquals(flush, readOnlyEngine.flush(randomBoolean(), true));
             } finally {
                 IOUtils.close(readOnlyEngine);
             }

+ 18 - 0
server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java

@@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -59,6 +60,8 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.test.ESIntegTestCase;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
@@ -104,6 +107,21 @@ public class FlushIT extends ESIntegTestCase {
         }
     }
 
+    public void testRejectIllegalFlushParameters() {
+        createIndex("test");
+        int numDocs = randomIntBetween(0, 10);
+        for (int i = 0; i < numDocs; i++) {
+            client().prepareIndex("test", "_doc").setSource("{}", XContentType.JSON).get();
+        }
+        assertThat(expectThrows(ValidationException.class,
+            () -> client().admin().indices().flush(new FlushRequest().force(true).waitIfOngoing(false)).actionGet()).getMessage(),
+            containsString("wait_if_ongoing must be true for a force flush"));
+        assertThat(client().admin().indices().flush(new FlushRequest().force(true).waitIfOngoing(true)).actionGet()
+            .getShardFailures(), emptyArray());
+        assertThat(client().admin().indices().flush(new FlushRequest().force(false).waitIfOngoing(randomBoolean()))
+            .actionGet().getShardFailures(), emptyArray());
+    }
+
     public void testSyncedFlush() throws Exception {
         internalCluster().ensureAtLeastNumDataNodes(2);
         prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get();