Browse Source

Add a cluster block that allows to delete indices that are read-only (#24678)

Today when an index is `read-only` the index is also blocked from
being deleted which sometimes is undesired since in-order to make
changes to a cluster indices must be deleted to free up space. This is
a likely scenario in a hosted environment when disk-space is limited to switch
indices read-only but allow deletions to free up space.
Simon Willnauer 8 years ago
parent
commit
1cae850cf5
43 changed files with 262 additions and 109 deletions
  1. 9 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/settings/SettingsUpdater.java
  2. 9 6
      core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
  3. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java
  4. 4 1
      core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java
  5. 20 3
      core/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java
  6. 5 28
      core/src/main/java/org/elasticsearch/cluster/block/ClusterBlockLevel.java
  7. 34 15
      core/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
  8. 9 4
      core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
  9. 8 2
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
  10. 1 1
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java
  11. 1 0
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java
  12. 1 0
      core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  13. 1 0
      core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java
  14. 2 2
      core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java
  15. 7 2
      core/src/main/java/org/elasticsearch/gateway/GatewayService.java
  16. 1 1
      core/src/main/java/org/elasticsearch/rest/RestStatus.java
  17. 2 4
      core/src/main/java/org/elasticsearch/tribe/TribeService.java
  18. 9 0
      core/src/test/java/org/elasticsearch/action/admin/cluster/settings/SettingsUpdaterTests.java
  19. 3 1
      core/src/test/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java
  20. 2 1
      core/src/test/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java
  21. 50 4
      core/src/test/java/org/elasticsearch/action/admin/indices/delete/DeleteIndexBlocksIT.java
  22. 3 1
      core/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java
  23. 2 1
      core/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeBlocksIT.java
  24. 2 1
      core/src/test/java/org/elasticsearch/action/admin/indices/get/GetIndexIT.java
  25. 3 2
      core/src/test/java/org/elasticsearch/action/admin/indices/refresh/RefreshBlocksIT.java
  26. 3 1
      core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsBlocksIT.java
  27. 3 1
      core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsBlocksIT.java
  28. 2 2
      core/src/test/java/org/elasticsearch/action/main/MainActionTests.java
  29. 2 2
      core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java
  30. 1 1
      core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java
  31. 5 5
      core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  32. 1 1
      core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java
  33. 3 1
      core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java
  34. 3 3
      core/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java
  35. 23 2
      core/src/test/java/org/elasticsearch/cluster/settings/ClusterSettingsIT.java
  36. 3 1
      core/src/test/java/org/elasticsearch/cluster/shards/ClusterSearchShardsIT.java
  37. 3 1
      core/src/test/java/org/elasticsearch/indices/exists/indices/IndicesExistsIT.java
  38. 2 1
      core/src/test/java/org/elasticsearch/indices/settings/GetSettingsBlocksIT.java
  39. 3 2
      core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java
  40. 4 0
      docs/reference/index-modules.asciidoc
  41. 5 0
      docs/reference/modules/cluster/misc.asciidoc
  42. 5 2
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java
  43. 2 1
      test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

+ 9 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/settings/SettingsUpdater.java

@@ -67,12 +67,20 @@ final class SettingsUpdater {
             .transientSettings(transientSettings.build());
 
         ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
-        boolean updatedReadOnly = MetaData.SETTING_READ_ONLY_SETTING.get(metaData.persistentSettings()) || MetaData.SETTING_READ_ONLY_SETTING.get(metaData.transientSettings());
+        boolean updatedReadOnly = MetaData.SETTING_READ_ONLY_SETTING.get(metaData.persistentSettings())
+            || MetaData.SETTING_READ_ONLY_SETTING.get(metaData.transientSettings());
         if (updatedReadOnly) {
             blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
         } else {
             blocks.removeGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
         }
+        boolean updatedReadOnlyAllowDelete = MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.get(metaData.persistentSettings())
+            || MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.get(metaData.transientSettings());
+        if (updatedReadOnlyAllowDelete) {
+            blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
+        } else {
+            blocks.removeGlobalBlock(MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
+        }
         ClusterState build = builder(currentState).metaData(metaData).blocks(blocks).build();
         Settings settings = build.metaData().settings();
         // now we try to apply things and if they are invalid we fail

+ 9 - 6
core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

@@ -67,12 +67,15 @@ public class TransportClusterUpdateSettingsAction extends
     @Override
     protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) {
         // allow for dedicated changes to the metadata blocks, so we don't block those to allow to "re-enable" it
-        if ((request.transientSettings().isEmpty() &&
-            request.persistentSettings().size() == 1 &&
-            MetaData.SETTING_READ_ONLY_SETTING.exists(request.persistentSettings())) ||
-            (request.persistentSettings().isEmpty() && request.transientSettings().size() == 1 &&
-                MetaData.SETTING_READ_ONLY_SETTING.exists(request.transientSettings()))) {
-            return null;
+        if (request.transientSettings().size() + request.persistentSettings().size() == 1) {
+            // only one setting
+            if (MetaData.SETTING_READ_ONLY_SETTING.exists(request.persistentSettings())
+                || MetaData.SETTING_READ_ONLY_SETTING.exists(request.transientSettings())
+                || MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.exists(request.transientSettings())
+                || MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.exists(request.persistentSettings())) {
+                // one of the settings above as the only setting in the request means - resetting the block!
+                return null;
+            }
         }
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
     }

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java

@@ -78,7 +78,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeAction<Delete
 
     @Override
     protected ClusterBlockException checkBlock(DeleteIndexRequest request, ClusterState state) {
-        return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indexNameExpressionResolver.concreteIndexNames(state, request));
+        return state.blocks().indicesAllowReleaseResources(indexNameExpressionResolver.concreteIndexNames(state, request));
     }
 
     @Override

+ 4 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java

@@ -62,7 +62,10 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeAction<Upd
         if (globalBlock != null) {
             return globalBlock;
         }
-        if (request.settings().size() == 1 && IndexMetaData.INDEX_BLOCKS_METADATA_SETTING.exists(request.settings()) || IndexMetaData.INDEX_READ_ONLY_SETTING.exists(request.settings())) {
+        if (request.settings().size() == 1 &&  // we have to allow resetting these settings otherwise users can't unblock an index
+            IndexMetaData.INDEX_BLOCKS_METADATA_SETTING.exists(request.settings())
+            || IndexMetaData.INDEX_READ_ONLY_SETTING.exists(request.settings())
+            || IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.exists(request.settings())) {
             return null;
         }
         return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indexNameExpressionResolver.concreteIndexNames(state, request));

+ 20 - 3
core/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.cluster.block;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
@@ -43,18 +44,22 @@ public class ClusterBlock implements Streamable, ToXContent {
 
     private boolean disableStatePersistence = false;
 
+    private boolean allowReleaseResources;
+
     private RestStatus status;
 
     ClusterBlock() {
     }
 
-    public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
+    public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources, RestStatus status,
+                        EnumSet<ClusterBlockLevel> levels) {
         this.id = id;
         this.description = description;
         this.retryable = retryable;
         this.disableStatePersistence = disableStatePersistence;
         this.status = status;
         this.levels = levels;
+        this.allowReleaseResources = allowReleaseResources;
     }
 
     public int id() {
@@ -127,12 +132,17 @@ public class ClusterBlock implements Streamable, ToXContent {
         final int len = in.readVInt();
         ArrayList<ClusterBlockLevel> levels = new ArrayList<>(len);
         for (int i = 0; i < len; i++) {
-            levels.add(ClusterBlockLevel.fromId(in.readVInt()));
+            levels.add(in.readEnum(ClusterBlockLevel.class));
         }
         this.levels = EnumSet.copyOf(levels);
         retryable = in.readBoolean();
         disableStatePersistence = in.readBoolean();
         status = RestStatus.readFrom(in);
+        if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
+            allowReleaseResources = in.readBoolean();
+        } else {
+            allowReleaseResources = false;
+        }
     }
 
     @Override
@@ -141,11 +151,14 @@ public class ClusterBlock implements Streamable, ToXContent {
         out.writeString(description);
         out.writeVInt(levels.size());
         for (ClusterBlockLevel level : levels) {
-            out.writeVInt(level.id());
+            out.writeEnum(level);
         }
         out.writeBoolean(retryable);
         out.writeBoolean(disableStatePersistence);
         RestStatus.writeTo(out, status);
+        if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
+            out.writeBoolean(allowReleaseResources);
+        }
     }
 
     @Override
@@ -176,4 +189,8 @@ public class ClusterBlock implements Streamable, ToXContent {
     public int hashCode() {
         return id;
     }
+
+    public boolean isAllowReleaseResources() {
+        return allowReleaseResources;
+    }
 }

+ 5 - 28
core/src/main/java/org/elasticsearch/cluster/block/ClusterBlockLevel.java

@@ -23,34 +23,11 @@ package org.elasticsearch.cluster.block;
 import java.util.EnumSet;
 
 public enum ClusterBlockLevel {
-    READ(0),
-    WRITE(1),
-    METADATA_READ(2),
-    METADATA_WRITE(3);
+    READ,
+    WRITE,
+    METADATA_READ,
+    METADATA_WRITE;
 
-    public static final EnumSet<ClusterBlockLevel> ALL = EnumSet.of(READ, WRITE, METADATA_READ, METADATA_WRITE);
+    public static final EnumSet<ClusterBlockLevel> ALL = EnumSet.allOf(ClusterBlockLevel.class);
     public static final EnumSet<ClusterBlockLevel> READ_WRITE = EnumSet.of(READ, WRITE);
-
-    private final int id;
-
-    ClusterBlockLevel(int id) {
-        this.id = id;
-    }
-
-    public int id() {
-        return this.id;
-    }
-
-    static ClusterBlockLevel fromId(int id) {
-        if (id == 0) {
-            return READ;
-        } else if (id == 1) {
-            return WRITE;
-        } else if (id == 2) {
-            return METADATA_READ;
-        } else if (id == 3) {
-            return METADATA_WRITE;
-        }
-        throw new IllegalArgumentException("No cluster block level matching [" + id + "]");
-    }
 }

+ 34 - 15
core/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java

@@ -70,11 +70,11 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
     }
 
     public Set<ClusterBlock> global(ClusterBlockLevel level) {
-        return levelHolders[level.id()].global();
+        return levelHolders[level.ordinal()].global();
     }
 
     public ImmutableOpenMap<String, Set<ClusterBlock>> indices(ClusterBlockLevel level) {
-        return levelHolders[level.id()].indices();
+        return levelHolders[level.ordinal()].indices();
     }
 
     private Set<ClusterBlock> blocksForIndex(ClusterBlockLevel level, String index) {
@@ -97,7 +97,7 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
                     .collect(toSet())));
             }
 
-            levelHolders[level.id()] = new ImmutableLevelHolder(newGlobal, indicesBuilder.build());
+            levelHolders[level.ordinal()] = new ImmutableLevelHolder(newGlobal, indicesBuilder.build());
         }
         return levelHolders;
     }
@@ -203,6 +203,26 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
         return new ClusterBlockException(unmodifiableSet(blocks.collect(toSet())));
     }
 
+    /**
+     * Returns <code>true</code> iff non of the given have a {@link ClusterBlockLevel#METADATA_WRITE} in place where the
+     * {@link ClusterBlock#isAllowReleaseResources()} returns <code>false</code>. This is used in places where resources will be released
+     * like the deletion of an index to free up resources on nodes.
+     * @param indices the indices to check
+     */
+    public ClusterBlockException indicesAllowReleaseResources(String[] indices) {
+        final Function<String, Stream<ClusterBlock>> blocksForIndexAtLevel = index ->
+            blocksForIndex(ClusterBlockLevel.METADATA_WRITE, index).stream();
+        Stream<ClusterBlock> blocks = concat(
+            global(ClusterBlockLevel.METADATA_WRITE).stream(),
+            Stream.of(indices).flatMap(blocksForIndexAtLevel)).filter(clusterBlock -> clusterBlock.isAllowReleaseResources() == false);
+        Set<ClusterBlock> clusterBlocks = unmodifiableSet(blocks.collect(toSet()));
+        if (clusterBlocks.isEmpty()) {
+            return null;
+        }
+        return new ClusterBlockException(clusterBlocks);
+    }
+
+
     @Override
     public String toString() {
         if (global.isEmpty() && indices().isEmpty()) {
@@ -270,8 +290,6 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
 
     static class ImmutableLevelHolder {
 
-        static final ImmutableLevelHolder EMPTY = new ImmutableLevelHolder(emptySet(), ImmutableOpenMap.of());
-
         private final Set<ClusterBlock> global;
         private final ImmutableOpenMap<String, Set<ClusterBlock>> indices;
 
@@ -314,30 +332,31 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
         }
 
         public Builder addBlocks(IndexMetaData indexMetaData) {
+            String indexName = indexMetaData.getIndex().getName();
             if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
-                addIndexBlock(indexMetaData.getIndex().getName(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
+                addIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
             }
             if (IndexMetaData.INDEX_READ_ONLY_SETTING.get(indexMetaData.getSettings())) {
-                addIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_READ_ONLY_BLOCK);
+                addIndexBlock(indexName, IndexMetaData.INDEX_READ_ONLY_BLOCK);
             }
             if (IndexMetaData.INDEX_BLOCKS_READ_SETTING.get(indexMetaData.getSettings())) {
-                addIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_READ_BLOCK);
+                addIndexBlock(indexName, IndexMetaData.INDEX_READ_BLOCK);
             }
             if (IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexMetaData.getSettings())) {
-                addIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_WRITE_BLOCK);
+                addIndexBlock(indexName, IndexMetaData.INDEX_WRITE_BLOCK);
             }
             if (IndexMetaData.INDEX_BLOCKS_METADATA_SETTING.get(indexMetaData.getSettings())) {
-                addIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_METADATA_BLOCK);
+                addIndexBlock(indexName, IndexMetaData.INDEX_METADATA_BLOCK);
+            }
+            if (IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.get(indexMetaData.getSettings())) {
+                addIndexBlock(indexName, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
             }
             return this;
         }
 
         public Builder updateBlocks(IndexMetaData indexMetaData) {
-            removeIndexBlock(indexMetaData.getIndex().getName(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
-            removeIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_READ_ONLY_BLOCK);
-            removeIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_READ_BLOCK);
-            removeIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_WRITE_BLOCK);
-            removeIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_METADATA_BLOCK);
+            // let's remove all blocks for this index and add them back -- no need to remove all individual blocks....
+            indices.remove(indexMetaData.getIndex().getName());
             return addBlocks(indexMetaData);
         }
 

+ 9 - 4
core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

@@ -131,10 +131,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
         return proto;
     }
 
-    public static final ClusterBlock INDEX_READ_ONLY_BLOCK = new ClusterBlock(5, "index read-only (api)", false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
-    public static final ClusterBlock INDEX_READ_BLOCK = new ClusterBlock(7, "index read (api)", false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.READ));
-    public static final ClusterBlock INDEX_WRITE_BLOCK = new ClusterBlock(8, "index write (api)", false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE));
-    public static final ClusterBlock INDEX_METADATA_BLOCK = new ClusterBlock(9, "index metadata (api)", false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.METADATA_WRITE, ClusterBlockLevel.METADATA_READ));
+    public static final ClusterBlock INDEX_READ_ONLY_BLOCK = new ClusterBlock(5, "index read-only (api)", false, false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
+    public static final ClusterBlock INDEX_READ_BLOCK = new ClusterBlock(7, "index read (api)", false, false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.READ));
+    public static final ClusterBlock INDEX_WRITE_BLOCK = new ClusterBlock(8, "index write (api)", false, false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE));
+    public static final ClusterBlock INDEX_METADATA_BLOCK = new ClusterBlock(9, "index metadata (api)", false, false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.METADATA_WRITE, ClusterBlockLevel.METADATA_READ));
+    public static final ClusterBlock INDEX_READ_ONLY_ALLOW_DELETE_BLOCK = new ClusterBlock(12, "index read-only / allow delete (api)", false, false, true, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.METADATA_WRITE, ClusterBlockLevel.WRITE));
 
     public enum State {
         OPEN((byte) 0),
@@ -212,6 +213,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
     public static final Setting<Boolean> INDEX_BLOCKS_METADATA_SETTING =
         Setting.boolSetting(SETTING_BLOCKS_METADATA, false, Property.Dynamic, Property.IndexScope);
 
+    public static final String SETTING_READ_ONLY_ALLOW_DELETE = "index.blocks.read_only_allow_delete";
+    public static final Setting<Boolean> INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING =
+        Setting.boolSetting(SETTING_READ_ONLY_ALLOW_DELETE, false, Property.Dynamic, Property.IndexScope);
+
     public static final String SETTING_VERSION_CREATED = "index.version.created";
     public static final String SETTING_VERSION_CREATED_STRING = "index.version.created_string";
     public static final String SETTING_VERSION_UPGRADED = "index.version.upgraded";

+ 8 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

@@ -24,7 +24,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.CollectionUtil;
-import org.elasticsearch.Version;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.cluster.Diffable;
 import org.elasticsearch.cluster.DiffableUtils;
@@ -119,7 +118,14 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
     public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
         Setting.boolSetting("cluster.blocks.read_only", false, Property.Dynamic, Property.NodeScope);
 
-    public static final ClusterBlock CLUSTER_READ_ONLY_BLOCK = new ClusterBlock(6, "cluster read-only (api)", false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
+    public static final ClusterBlock CLUSTER_READ_ONLY_BLOCK = new ClusterBlock(6, "cluster read-only (api)", false, false,
+        false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
+
+    public static final Setting<Boolean> SETTING_READ_ONLY_ALLOW_DELETE_SETTING =
+        Setting.boolSetting("cluster.blocks.read_only_allow_delete", false, Property.Dynamic, Property.NodeScope);
+
+    public static final ClusterBlock CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK = new ClusterBlock(13, "cluster read-only / allow delete (api)",
+        false, false, true, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
 
     public static final MetaData EMPTY_META_DATA = builder().build();
 

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

@@ -54,7 +54,7 @@ import java.util.Set;
  */
 public class MetaDataIndexStateService extends AbstractComponent {
 
-    public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
+    public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
 
     private final ClusterService clusterService;
 

+ 1 - 0
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java

@@ -230,6 +230,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
 
                 ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
                 maybeUpdateClusterBlock(actualIndices, blocks, IndexMetaData.INDEX_READ_ONLY_BLOCK, IndexMetaData.INDEX_READ_ONLY_SETTING, openSettings);
+                maybeUpdateClusterBlock(actualIndices, blocks, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK, IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, openSettings);
                 maybeUpdateClusterBlock(actualIndices, blocks, IndexMetaData.INDEX_METADATA_BLOCK, IndexMetaData.INDEX_BLOCKS_METADATA_SETTING, openSettings);
                 maybeUpdateClusterBlock(actualIndices, blocks, IndexMetaData.INDEX_WRITE_BLOCK, IndexMetaData.INDEX_BLOCKS_WRITE_SETTING, openSettings);
                 maybeUpdateClusterBlock(actualIndices, blocks, IndexMetaData.INDEX_READ_BLOCK, IndexMetaData.INDEX_BLOCKS_READ_SETTING, openSettings);

+ 1 - 0
core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -187,6 +187,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
                     MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
                     MetaData.SETTING_READ_ONLY_SETTING,
+                    MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
                     RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
                     RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
                     RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,

+ 1 - 0
core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -74,6 +74,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         IndexMetaData.INDEX_BLOCKS_READ_SETTING,
         IndexMetaData.INDEX_BLOCKS_WRITE_SETTING,
         IndexMetaData.INDEX_BLOCKS_METADATA_SETTING,
+        IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING,
         IndexMetaData.INDEX_PRIORITY_SETTING,
         IndexMetaData.INDEX_DATA_PATH_SETTING,
         SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,

+ 2 - 2
core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java

@@ -37,8 +37,8 @@ import java.util.EnumSet;
 public class DiscoverySettings extends AbstractComponent {
 
     public static final int NO_MASTER_BLOCK_ID = 2;
-    public static final ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
-    public static final ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
+    public static final ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
+    public static final ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, false, RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
     /**
      * sets the timeout for a complete publishing cycle, including both sending and committing. the master
      * will continue to process the next cluster state update after this time has elapsed

+ 7 - 2
core/src/main/java/org/elasticsearch/gateway/GatewayService.java

@@ -65,7 +65,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
     public static final Setting<Integer> RECOVER_AFTER_MASTER_NODES_SETTING =
         Setting.intSetting("gateway.recover_after_master_nodes", 0, 0, Property.NodeScope);
 
-    public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
+    public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
 
     public static final TimeValue DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET = TimeValue.timeValueMinutes(5);
 
@@ -246,9 +246,14 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
                     // automatically generate a UID for the metadata if we need to
                     metaDataBuilder.generateClusterUuidIfNeeded();
 
-                    if (MetaData.SETTING_READ_ONLY_SETTING.get(recoveredState.metaData().settings()) || MetaData.SETTING_READ_ONLY_SETTING.get(currentState.metaData().settings())) {
+                    if (MetaData.SETTING_READ_ONLY_SETTING.get(recoveredState.metaData().settings())
+                        || MetaData.SETTING_READ_ONLY_SETTING.get(currentState.metaData().settings())) {
                         blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
                     }
+                    if (MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.get(recoveredState.metaData().settings())
+                        || MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.get(currentState.metaData().settings())) {
+                        blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
+                    }
 
                     for (IndexMetaData indexMetaData : recoveredState.metaData()) {
                         metaDataBuilder.put(indexMetaData, false);

+ 1 - 1
core/src/main/java/org/elasticsearch/rest/RestStatus.java

@@ -479,7 +479,7 @@ public enum RestStatus {
      * is considered to be temporary. If the request that received this status code was the result of a user action,
      * the request MUST NOT be repeated until it is requested by a separate user action.
      */
-    INSUFFICIENT_STORAGE(506);
+    INSUFFICIENT_STORAGE(507);
 
     private static final Map<Integer, RestStatus> CODE_TO_STATUS;
     static {

+ 2 - 4
core/src/main/java/org/elasticsearch/tribe/TribeService.java

@@ -61,7 +61,6 @@ import org.elasticsearch.discovery.DiscoveryModule;
 import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
-import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.transport.TransportSettings;
@@ -74,7 +73,6 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Function;
@@ -103,9 +101,9 @@ import static java.util.Collections.unmodifiableMap;
 public class TribeService extends AbstractLifecycleComponent {
 
     public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false,
-            RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE));
+        false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE));
     public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false,
-            RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
+        false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
 
     public static Settings processSettings(Settings settings) {
         if (TRIBE_NAME_SETTING.exists(settings)) {

+ 9 - 0
core/src/test/java/org/elasticsearch/action/admin/cluster/settings/SettingsUpdaterTests.java

@@ -122,5 +122,14 @@ public class SettingsUpdaterTests extends ESTestCase {
             Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), false).build());
         assertEquals(clusterState.blocks().global().size(), 0);
 
+
+        clusterState = updater.updateSettings(build, Settings.builder().put(MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true).build(),
+            Settings.builder().put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), 1.6).put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), 1.0f).build());
+        assertEquals(clusterState.blocks().global().size(), 1);
+        assertEquals(clusterState.blocks().global().iterator().next(), MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
+        clusterState = updater.updateSettings(build, Settings.EMPTY,
+            Settings.builder().put(MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), false).build());
+        assertEquals(clusterState.blocks().global().size(), 0);
+
     }
 }

+ 3 - 1
core/src/test/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java

@@ -28,6 +28,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 
 @ClusterScope(scope = ESIntegTestCase.Scope.TEST)
 public class PendingTasksBlocksIT extends ESIntegTestCase {
@@ -36,7 +37,8 @@ public class PendingTasksBlocksIT extends ESIntegTestCase {
         ensureGreen("test");
 
         // This test checks that the Pending Cluster Tasks operation is never blocked, even if an index is read only or whatever.
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test", blockSetting);
                 PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().execute().actionGet();

+ 2 - 1
core/src/test/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java

@@ -28,6 +28,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.hamcrest.Matchers.equalTo;
@@ -52,7 +53,7 @@ public class ClearIndicesCacheBlocksIT extends ESIntegTestCase {
             }
         }
         // Request is blocked
-        for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
+        for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA, SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test", blockSetting);
                 assertBlocked(client().admin().indices().prepareClearCache("test").setFieldDataCache(true).setQueryCache(true).setFieldDataCache(true));

+ 50 - 4
core/src/test/java/org/elasticsearch/action/admin/indices/delete/DeleteIndexBlocksIT.java

@@ -19,22 +19,68 @@
 
 package org.elasticsearch.action.admin.indices.delete;
 
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
 
-@ClusterScope(scope = ESIntegTestCase.Scope.TEST)
 public class DeleteIndexBlocksIT extends ESIntegTestCase {
     public void testDeleteIndexWithBlocks() {
         createIndex("test");
         ensureGreen("test");
-
         try {
             setClusterReadOnly(true);
-            assertBlocked(client().admin().indices().prepareDelete("test"));
+            assertBlocked(client().admin().indices().prepareDelete("test"), MetaData.CLUSTER_READ_ONLY_BLOCK);
         } finally {
             setClusterReadOnly(false);
         }
     }
+
+    public void testDeleteIndexOnIndexReadOnlyAllowDeleteSetting() {
+        createIndex("test");
+        ensureGreen("test");
+        client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar").get();
+        refresh();
+        try {
+            Settings settings = Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build();
+            assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get());
+            assertSearchHits(client().prepareSearch().get(), "1");
+            assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("2").setSource("foo", "bar"),
+                IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
+            assertBlocked(client().admin().indices().prepareUpdateSettings("test")
+                    .setSettings(Settings.builder().put("index.number_of_replicas", 2)), IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
+            assertSearchHits(client().prepareSearch().get(), "1");
+            assertAcked(client().admin().indices().prepareDelete("test"));
+        } finally {
+            Settings settings = Settings.builder().putNull(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE).build();
+            assertAcked(client().admin().indices().prepareUpdateSettings("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).
+                setSettings(settings).get());
+        }
+    }
+
+    public void testDeleteIndexOnReadOnlyAllowDeleteSetting() {
+        createIndex("test");
+        ensureGreen("test");
+        client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar").get();
+        refresh();
+        try {
+            Settings settings = Settings.builder().put(MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true).build();
+            assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
+            assertSearchHits(client().prepareSearch().get(), "1");
+            assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("2").setSource("foo", "bar"),
+                MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
+            assertBlocked(client().admin().indices().prepareUpdateSettings("test")
+                .setSettings(Settings.builder().put("index.number_of_replicas", 2)), MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
+            assertSearchHits(client().prepareSearch().get(), "1");
+            assertAcked(client().admin().indices().prepareDelete("test"));
+        } finally {
+            Settings settings = Settings.builder().putNull(MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey()).build();
+            assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
+        }
+    }
 }

+ 3 - 1
core/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java

@@ -28,6 +28,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.hamcrest.Matchers.equalTo;
@@ -46,7 +47,8 @@ public class FlushBlocksIT extends ESIntegTestCase {
         }
 
         // Request is not blocked
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test", blockSetting);
                 FlushResponse response = client().admin().indices().prepareFlush("test").execute().actionGet();

+ 2 - 1
core/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeBlocksIT.java

@@ -28,6 +28,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.hamcrest.Matchers.equalTo;
@@ -59,7 +60,7 @@ public class ForceMergeBlocksIT extends ESIntegTestCase {
         }
 
         // Request is blocked
-        for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
+        for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA, SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test", blockSetting);
                 assertBlocked(client().admin().indices().prepareForceMerge("test"));

+ 2 - 1
core/src/test/java/org/elasticsearch/action/admin/indices/get/GetIndexIT.java

@@ -38,6 +38,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.hamcrest.Matchers.anyOf;
@@ -178,7 +179,7 @@ public class GetIndexIT extends ESIntegTestCase {
     }
 
     public void testGetIndexWithBlocks() {
-        for (String block : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY)) {
+        for (String block : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("idx", block);
                 GetIndexResponse response = client().admin().indices().prepareGetIndex().addIndices("idx")

+ 3 - 2
core/src/test/java/org/elasticsearch/action/admin/indices/refresh/RefreshBlocksIT.java

@@ -29,7 +29,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -42,7 +42,8 @@ public class RefreshBlocksIT extends ESIntegTestCase {
         NumShards numShards = getNumShards("test");
 
         // Request is not blocked
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test", blockSetting);
                 RefreshResponse response = client().admin().indices().prepareRefresh("test").execute().actionGet();

+ 3 - 1
core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsBlocksIT.java

@@ -28,6 +28,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 
@@ -44,7 +45,8 @@ public class IndicesSegmentsBlocksIT extends ESIntegTestCase {
         client().admin().indices().prepareFlush("test-blocks").get();
 
         // Request is not blocked
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test-blocks", blockSetting);
                 IndicesSegmentResponse response = client().admin().indices().prepareSegments("test-blocks").execute().actionGet();

+ 3 - 1
core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsBlocksIT.java

@@ -29,6 +29,7 @@ import java.util.Arrays;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 
 @ClusterScope(scope = ESIntegTestCase.Scope.TEST)
 public class IndicesStatsBlocksIT extends ESIntegTestCase {
@@ -37,7 +38,8 @@ public class IndicesStatsBlocksIT extends ESIntegTestCase {
         ensureGreen("ro");
 
         // Request is not blocked
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("ro", blockSetting);
                 IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("ro").execute().actionGet();

+ 2 - 2
core/src/test/java/org/elasticsearch/action/main/MainActionTests.java

@@ -108,13 +108,13 @@ public class MainActionTests extends ESTestCase {
             } else {
                 blocks = ClusterBlocks.builder()
                     .addGlobalBlock(new ClusterBlock(randomIntBetween(1, 16), "test global block 400", randomBoolean(), randomBoolean(),
-                        RestStatus.BAD_REQUEST, ClusterBlockLevel.ALL))
+                        false, RestStatus.BAD_REQUEST, ClusterBlockLevel.ALL))
                     .build();
             }
         } else {
             blocks = ClusterBlocks.builder()
                 .addGlobalBlock(new ClusterBlock(randomIntBetween(1, 16), "test global block 503", randomBoolean(), randomBoolean(),
-                    RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL))
+                    false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL))
                 .build();
         }
         ClusterState state = ClusterState.builder(clusterName).blocks(blocks).build();

+ 2 - 2
core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

@@ -262,7 +262,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
 
         ClusterBlocks.Builder block = ClusterBlocks.builder()
-                .addGlobalBlock(new ClusterBlock(1, "test-block", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
+                .addGlobalBlock(new ClusterBlock(1, "test-block", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
         try {
             action.new AsyncAction(null, request, listener).start();
@@ -277,7 +277,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
 
         ClusterBlocks.Builder block = ClusterBlocks.builder()
-                .addIndexBlock(TEST_INDEX, new ClusterBlock(1, "test-block", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
+                .addIndexBlock(TEST_INDEX, new ClusterBlock(1, "test-block", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
         try {
             action.new AsyncAction(null, request, listener).start();

+ 1 - 1
core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

@@ -205,7 +205,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
 
         ClusterBlock block = new ClusterBlock(1, "", retryableBlock, true,
-                randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
+            false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
         ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
                 .blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
         setState(clusterService, stateWithBlock);

+ 5 - 5
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -194,8 +194,8 @@ public class TransportReplicationActionTests extends ESTestCase {
             }
         };
 
-        ClusterBlocks.Builder block = ClusterBlocks.builder()
-            .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
+        ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true,
+            false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
         TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
         reroutePhase.run();
@@ -203,7 +203,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         assertPhase(task, "failed");
 
         block = ClusterBlocks.builder()
-            .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
+            .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
         listener = new PlainActionFuture<>();
         reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
@@ -219,8 +219,8 @@ public class TransportReplicationActionTests extends ESTestCase {
         assertPhase(task, "waiting_for_retry");
         assertTrue(request.isRetrySet.get());
 
-        block = ClusterBlocks.builder()
-            .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
+        block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, false,
+            RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
         assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener,
             ClusterBlockException.class);

+ 1 - 1
core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java

@@ -176,7 +176,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
         Request request = new Request();
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
         ClusterBlocks.Builder block = ClusterBlocks.builder()
-                .addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
+                .addGlobalBlock(new ClusterBlock(1, "", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
         try {
             action.new AsyncSingleAction(request, listener).start();

+ 3 - 1
core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java

@@ -59,6 +59,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
@@ -327,7 +328,8 @@ public class ClusterRerouteIT extends ESIntegTestCase {
         int toggle = nodesIds.indexOf(node.getName());
 
         // Rerouting shards is not blocked
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test-blocks", blockSetting);
                 assertAcked(client().admin().cluster().prepareReroute()

+ 3 - 3
core/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java

@@ -49,7 +49,7 @@ public class ClusterBlockTests extends ESTestCase {
             }
 
             ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
-                    randomBoolean(), randomFrom(RestStatus.values()), levels);
+                    randomBoolean(), false, randomFrom(RestStatus.values()), levels);
 
             BytesStreamOutput out = new BytesStreamOutput();
             out.setVersion(version);
@@ -75,7 +75,7 @@ public class ClusterBlockTests extends ESTestCase {
             levels.add(randomFrom(ClusterBlockLevel.values()));
         }
         ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
-                randomBoolean(), randomFrom(RestStatus.values()), levels);
+                randomBoolean(), false, randomFrom(RestStatus.values()), levels);
         assertThat(clusterBlock.toString(), not(endsWith(",")));
     }
 
@@ -86,7 +86,7 @@ public class ClusterBlockTests extends ESTestCase {
             levels.add(randomFrom(ClusterBlockLevel.values()));
         }
         ClusterBlock globalBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
-            randomBoolean(), randomFrom(RestStatus.values()), levels);
+            randomBoolean(), false, randomFrom(RestStatus.values()), levels);
         ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), ImmutableOpenMap.of());
         ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]);
         assertNotNull(exception);

+ 23 - 2
core/src/test/java/org/elasticsearch/cluster/settings/ClusterSettingsIT.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.common.logging.ESLoggerFactory;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.discovery.Discovery;
@@ -33,6 +34,7 @@ import org.elasticsearch.discovery.zen.ZenDiscovery;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
+import org.junit.After;
 
 import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -42,8 +44,15 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
-@ClusterScope(scope = TEST)
 public class ClusterSettingsIT extends ESIntegTestCase {
+
+    @After
+    public void cleanup() throws Exception {
+        assertAcked(client().admin().cluster().prepareUpdateSettings()
+            .setPersistentSettings(Settings.builder().putNull("*"))
+            .setTransientSettings(Settings.builder().putNull("*")));
+    }
+
     public void testClusterNonExistingSettingsUpdate() {
         String key1 = "no_idea_what_you_are_talking_about";
         int value1 = 10;
@@ -302,13 +311,25 @@ public class ClusterSettingsIT extends ESIntegTestCase {
             assertBlocked(request, MetaData.CLUSTER_READ_ONLY_BLOCK);
 
             // But it's possible to update the settings to update the "cluster.blocks.read_only" setting
-            Settings settings = Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), false).build();
+            Settings settings = Settings.builder().putNull(MetaData.SETTING_READ_ONLY_SETTING.getKey()).build();
             assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
 
         } finally {
             setClusterReadOnly(false);
         }
 
+        // Cluster settings updates are blocked when the cluster is read only
+        try {
+            // But it's possible to update the settings to update the "cluster.blocks.read_only" setting
+            Settings settings = Settings.builder().put(MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true).build();
+            assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
+            assertBlocked(request, MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
+        } finally {
+            // But it's possible to update the settings to update the "cluster.blocks.read_only" setting
+            Settings s = Settings.builder().putNull(MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey()).build();
+            assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(s).get());
+        }
+
         // It should work now
         ClusterUpdateSettingsResponse response = request.execute().actionGet();
 

+ 3 - 1
core/src/test/java/org/elasticsearch/cluster/shards/ClusterSearchShardsIT.java

@@ -33,6 +33,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -136,7 +137,8 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
         ensureGreen("test-blocks");
 
         // Request is not blocked
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test-blocks", blockSetting);
                 ClusterSearchShardsResponse response = client().admin().cluster().prepareSearchShards("test-blocks").execute().actionGet();

+ 3 - 1
core/src/test/java/org/elasticsearch/indices/exists/indices/IndicesExistsIT.java

@@ -30,6 +30,7 @@ import java.util.Arrays;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -66,7 +67,8 @@ public class IndicesExistsIT extends ESIntegTestCase {
         createIndex("ro");
 
         // Request is not blocked
-        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY)) {
+        for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY,
+            SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("ro", blockSetting);
                 assertThat(client().admin().indices().prepareExists("ro").execute().actionGet().isExists(), equalTo(true));

+ 2 - 1
core/src/test/java/org/elasticsearch/indices/settings/GetSettingsBlocksIT.java

@@ -30,6 +30,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.hamcrest.Matchers.equalTo;
@@ -43,7 +44,7 @@ public class GetSettingsBlocksIT extends ESIntegTestCase {
                         .put("index.merge.policy.expunge_deletes_allowed", "30")
                         .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), false)));
 
-        for (String block : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY)) {
+        for (String block : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_READ_ONLY_ALLOW_DELETE)) {
             try {
                 enableIndexBlock("test", block);
                 GetSettingsResponse response = client().admin().indices().prepareGetSettings("test").get();

+ 3 - 2
core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java

@@ -44,6 +44,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_ME
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_READ;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -190,7 +191,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1", "test2", "test3");
     }
-    
+
     public void testCloseNoIndex() {
         Client client = client();
         Exception e = expectThrows(ActionRequestValidationException.class, () ->
@@ -380,7 +381,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
         assertIndexIsClosed("test");
 
         // Opening an index is blocked
-        for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
+        for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_READ_ONLY_ALLOW_DELETE, SETTING_BLOCKS_METADATA)) {
             try {
                 enableIndexBlock("test", blockSetting);
                 assertBlocked(client().admin().indices().prepareOpen("test"));

+ 4 - 0
docs/reference/index-modules.asciidoc

@@ -130,6 +130,10 @@ specific index module:
     Set to `true` to make the index and index metadata read only, `false` to
     allow writes and metadata changes.
 
+`index.blocks.read_only_allow_delete`::
+    Identical to `index.blocks.read_only` but allows deleting the index to free
+     up resources.
+
 `index.blocks.read`::
 
     Set to `true` to disable read operations against the index.

+ 5 - 0
docs/reference/modules/cluster/misc.asciidoc

@@ -12,6 +12,11 @@ An entire cluster may be set to read-only with the following _dynamic_ setting:
       operations), metadata is not allowed to be modified (create or delete
       indices).
 
+`cluster.blocks.read_only_allow_delete`::
+
+      Identical to `cluster.blocks.read_only` but allows to delete indices
+      to free up resources.
+
 WARNING: Don't rely on this setting to prevent changes to your cluster. Any
 user with access to the <<cluster-update-settings,cluster-update-settings>>
 API can make the cluster read-write again.

+ 5 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java

@@ -29,6 +29,8 @@ import org.elasticsearch.search.sort.SortOrder;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
 import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -200,12 +202,13 @@ public class DeleteByQueryBasicTests extends ReindexTestCase {
         }
         indexRandom(true, true, true, builders);
 
+        String block = randomFrom(SETTING_READ_ONLY, SETTING_READ_ONLY_ALLOW_DELETE);
         try {
-            enableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
+            enableIndexBlock("test", block);
             assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).get(),
                     matcher().deleted(0).failures(docs));
         } finally {
-            disableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
+            disableIndexBlock("test", block);
         }
 
         assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -1482,7 +1482,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
 
     /** Sets or unsets the cluster read_only mode **/
     public static void setClusterReadOnly(boolean value) {
-        Settings settings = Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), value).build();
+        Settings settings = value ? Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), value).build() :
+            Settings.builder().putNull(MetaData.SETTING_READ_ONLY_SETTING.getKey()).build()  ;
         assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
     }