Browse Source

Remove Blocks when disk threshold monitoring is disabled (#87841)

This change ensures that existing read_only_allow_delete blocks that
are placed on indices when the flood_stage watermark threshold is
exceeded, are removed when the disk threshold monitoring is disabled.

This is done by changing how InternalClusterInfoService behaves when
disabled. With this change, it will keep calling the registered
listeners periodically, but with an empty ClusterInfo.

Closes #86383
Pooya Salehi 3 năm trước cách đây
mục cha
commit
806d2976aa

+ 6 - 0
docs/changelog/87841.yaml

@@ -0,0 +1,6 @@
+pr: 87841
+summary: Remove any existing `read_only_allow_delete` index blocks when `cluster.routing.allocation.disk.threshold_enabled` is set to `false`
+area: Allocation
+type: bug
+issues:
+  - 86383

+ 1 - 1
docs/reference/modules/cluster/disk_allocator.asciidoc

@@ -65,7 +65,7 @@ You can use the following settings to control disk-based allocation:
 // tag::cluster-routing-disk-threshold-tag[]
 `cluster.routing.allocation.disk.threshold_enabled`::
 (<<dynamic-cluster-setting,Dynamic>>)
-Defaults to `true`. Set to `false` to disable the disk allocation decider.
+Defaults to `true`. Set to `false` to disable the disk allocation decider. Upon disabling, it will also remove any existing `index.blocks.read_only_allow_delete` index blocks.
 // end::cluster-routing-disk-threshold-tag[]
 
 [[cluster-routing-watermark-low]]

+ 57 - 20
server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorIT.java

@@ -30,15 +30,15 @@ import static org.hamcrest.Matchers.equalTo;
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
 public class DiskThresholdMonitorIT extends DiskUsageIntegTestCase {
 
-    private static final long FLOODSTAGE_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
+    private static final long FLOOD_STAGE_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
 
     @Override
     protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
         return Settings.builder()
             .put(super.nodeSettings(nodeOrdinal, otherSettings))
-            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), FLOODSTAGE_BYTES * 2 + "b")
-            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), FLOODSTAGE_BYTES * 2 + "b")
-            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), FLOODSTAGE_BYTES + "b")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), FLOOD_STAGE_BYTES * 2 + "b")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), FLOOD_STAGE_BYTES * 2 + "b")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), FLOOD_STAGE_BYTES + "b")
             .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "0ms")
             .build();
     }
@@ -67,15 +67,7 @@ public class DiskThresholdMonitorIT extends DiskUsageIntegTestCase {
                 client().prepareIndex().setIndex(indexName).setId("1").setSource("f", "g"),
                 IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK
             );
-            assertThat(
-                client().admin()
-                    .indices()
-                    .prepareGetSettings(indexName)
-                    .setNames(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
-                    .get()
-                    .getSetting(indexName, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE),
-                equalTo("true")
-            );
+            assertThat(getIndexBlock(indexName, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE), equalTo("true"));
         });
 
         // Verify that we can adjust things like allocation filters even while blocked
@@ -112,13 +104,58 @@ public class DiskThresholdMonitorIT extends DiskUsageIntegTestCase {
         // Verify that the block is removed once the shard migration is complete
         refreshClusterInfo();
         assertFalse(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut());
-        assertNull(
-            client().admin()
-                .indices()
-                .prepareGetSettings(indexName)
-                .setNames(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
-                .get()
-                .getSetting(indexName, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
+        assertNull(getIndexBlock(indexName, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE));
+    }
+
+    public void testRemoveExistingIndexBlocksWhenDiskThresholdMonitorIsDisabled() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final String dataNodeName = internalCluster().startDataOnlyNode();
+
+        final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        createIndex(
+            indexName,
+            Settings.builder()
+                .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
+                .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeName)
+                .build()
         );
+        // ensure we have a system index on the data node too.
+        assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_INDEX));
+
+        getTestFileStore(dataNodeName).setTotalSpace(1L);
+        refreshClusterInfo();
+        assertBusy(() -> {
+            assertBlocked(
+                client().prepareIndex().setIndex(indexName).setId("1").setSource("f", "g"),
+                IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK
+            );
+            assertThat(getIndexBlock(indexName, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE), equalTo("true"));
+        });
+
+        // Disable disk threshold monitoring
+        updateClusterSettings(
+            Settings.builder().put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
+        );
+
+        // Verify that the block is removed
+        refreshClusterInfo();
+        assertFalse(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut());
+        assertNull(getIndexBlock(indexName, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE));
+
+        // Re-enable and the blocks should be back!
+        updateClusterSettings(
+            Settings.builder().put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+        );
+        refreshClusterInfo();
+        assertFalse(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut());
+        assertThat(getIndexBlock(indexName, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE), equalTo("true"));
     }
+
+    // Retrieves the value of the given block on an index.
+    private static String getIndexBlock(String indexName, String blockName) {
+        return client().admin().indices().prepareGetSettings(indexName).setNames(blockName).get().getSetting(indexName, blockName);
+    }
+
 }

+ 36 - 31
server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

@@ -57,8 +57,10 @@ import static org.elasticsearch.core.Strings.format;
  * Listens for changes in the number of data nodes and immediately submits a
  * ClusterInfoUpdateJob if a node has been added.
  *
- * Every time the timer runs, gathers information about the disk usage and
- * shard sizes across the cluster.
+ * Every time the timer runs, if <code>cluster.routing.allocation.disk.threshold_enabled</code>
+ * is enabled, gathers information about the disk usage and shard sizes across the cluster,
+ * computes a new cluster info and notifies the registered listeners. If disk threshold
+ * monitoring is disabled, listeners are called with an empty cluster info.
  */
 public class InternalClusterInfoService implements ClusterInfoService, ClusterStateListener {
 
@@ -163,8 +165,16 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
         }
 
         void execute() {
-            assert countDown.isCountedDown() == false;
+            if (enabled == false) {
+                logger.trace("skipping collecting info from cluster, notifying listeners with empty cluster info");
+                leastAvailableSpaceUsages = Map.of();
+                mostAvailableSpaceUsages = Map.of();
+                indicesStatsSummary = IndicesStatsSummary.EMPTY;
+                callListeners();
+                return;
+            }
 
+            assert countDown.isCountedDown() == false;
             logger.trace("starting async refresh");
 
             final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
@@ -283,26 +293,30 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
         private void onStatsProcessed() {
             if (countDown.countDown()) {
                 logger.trace("stats all received, computing cluster info and notifying listeners");
-                try {
-                    final ClusterInfo clusterInfo = getClusterInfo();
-                    boolean anyListeners = false;
-                    for (final Consumer<ClusterInfo> listener : listeners) {
-                        anyListeners = true;
-                        try {
-                            logger.trace("notifying [{}] of new cluster info", listener);
-                            listener.accept(clusterInfo);
-                        } catch (Exception e) {
-                            logger.info(() -> "failed to notify [" + listener + "] of new cluster info", e);
-                        }
-                    }
-                    assert anyListeners : "expected to notify at least one listener";
+                callListeners();
+            }
+        }
 
-                    for (final ActionListener<ClusterInfo> listener : thisRefreshListeners) {
-                        listener.onResponse(clusterInfo);
+        private void callListeners() {
+            try {
+                final ClusterInfo clusterInfo = getClusterInfo();
+                boolean anyListeners = false;
+                for (final Consumer<ClusterInfo> listener : listeners) {
+                    anyListeners = true;
+                    try {
+                        logger.trace("notifying [{}] of new cluster info", listener);
+                        listener.accept(clusterInfo);
+                    } catch (Exception e) {
+                        logger.info(() -> "failed to notify [" + listener + "] of new cluster info", e);
                     }
-                } finally {
-                    onRefreshComplete(this);
                 }
+                assert anyListeners : "expected to notify at least one listener";
+
+                for (final ActionListener<ClusterInfo> listener : thisRefreshListeners) {
+                    listener.onResponse(clusterInfo);
+                }
+            } finally {
+                onRefreshComplete(this);
             }
         }
     }
@@ -335,17 +349,8 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
         final ArrayList<ActionListener<ClusterInfo>> thisRefreshListeners = new ArrayList<>(nextRefreshListeners);
         nextRefreshListeners.clear();
 
-        if (enabled) {
-            currentRefresh = new AsyncRefresh(thisRefreshListeners);
-            return currentRefresh::execute;
-        } else {
-            return () -> {
-                leastAvailableSpaceUsages = Map.of();
-                mostAvailableSpaceUsages = Map.of();
-                indicesStatsSummary = IndicesStatsSummary.EMPTY;
-                thisRefreshListeners.forEach(l -> l.onResponse(ClusterInfo.EMPTY));
-            };
-        }
+        currentRefresh = new AsyncRefresh(thisRefreshListeners);
+        return currentRefresh::execute;
     }
 
     private boolean assertRefreshInvariant() {

+ 43 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

@@ -70,6 +70,9 @@ public class DiskThresholdMonitor {
     private final RerouteService rerouteService;
     private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
     private final AtomicBoolean checkInProgress = new AtomicBoolean();
+    // Keeps track of whether the cleanup of existing index blocks (upon disabling
+    // the Disk Threshold Monitor) was successfully done or not.
+    private final AtomicBoolean cleanupUponDisableCalled = new AtomicBoolean();
 
     /**
      * The IDs of the nodes that were over the low threshold in the last check (and maybe over another threshold too). Tracked so that we
@@ -126,6 +129,14 @@ public class DiskThresholdMonitor {
             return;
         }
 
+        if (diskThresholdSettings.isEnabled() == false) {
+            removeExistingIndexBlocks();
+            return;
+        } else {
+            // reset this for the next disable call.
+            cleanupUponDisableCalled.set(false);
+        }
+
         final Map<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
         if (usages == null) {
             logger.trace("skipping monitor as no disk usage information is available");
@@ -458,6 +469,38 @@ public class DiskThresholdMonitor {
             .execute(wrappedListener.map(r -> null));
     }
 
+    private void removeExistingIndexBlocks() {
+        if (cleanupUponDisableCalled.get()) {
+            checkFinished();
+            return;
+        }
+        ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
+            cleanupUponDisableCalled.set(true);
+            checkFinished();
+        }, e -> {
+            logger.debug("removing read-only blocks from indices failed", e);
+            checkFinished();
+        });
+        final ClusterState state = clusterStateSupplier.get();
+        final Set<String> indicesToRelease = state.getBlocks()
+            .indices()
+            .keySet()
+            .stream()
+            .filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
+            .collect(Collectors.toUnmodifiableSet());
+        logger.trace("removing read-only block from indices [{}]", indicesToRelease);
+        if (indicesToRelease.isEmpty() == false) {
+            client.admin()
+                .indices()
+                .prepareUpdateSettings(indicesToRelease.toArray(Strings.EMPTY_ARRAY))
+                .setSettings(NOT_READ_ONLY_ALLOW_DELETE_SETTINGS)
+                .origin("disk-threshold-monitor")
+                .execute(wrappedListener.map(r -> null));
+        } else {
+            wrappedListener.onResponse(null);
+        }
+    }
+
     private static void cleanUpRemovedNodes(Set<String> nodesToKeep, Set<String> nodesToCleanUp) {
         for (String node : nodesToCleanUp) {
             if (nodesToKeep.contains(node) == false) {