Browse Source

Add DisruptibleHeartbeatStore (#95420)

We use the same notion of disruptibility in both main and stateless test
suites, so may as well abstract it out (and also fix a double-completion
bug).
David Turner 2 years ago
parent
commit
fa8418de3a

+ 14 - 13
server/src/test/java/org/elasticsearch/cluster/coordination/AtomicRegisterCoordinatorTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.stateless.AtomicRegisterPreVoteCollector;
 import org.elasticsearch.cluster.coordination.stateless.AtomicRegisterPreVoteCollector;
+import org.elasticsearch.cluster.coordination.stateless.DisruptibleHeartbeatStore;
 import org.elasticsearch.cluster.coordination.stateless.Heartbeat;
 import org.elasticsearch.cluster.coordination.stateless.Heartbeat;
 import org.elasticsearch.cluster.coordination.stateless.HeartbeatStore;
 import org.elasticsearch.cluster.coordination.stateless.HeartbeatStore;
 import org.elasticsearch.cluster.coordination.stateless.SingleNodeReconfigurator;
 import org.elasticsearch.cluster.coordination.stateless.SingleNodeReconfigurator;
@@ -145,8 +146,12 @@ public class AtomicRegisterCoordinatorTests extends CoordinatorTests {
         ) {
         ) {
             final TimeValue heartbeatFrequency = HEARTBEAT_FREQUENCY.get(settings);
             final TimeValue heartbeatFrequency = HEARTBEAT_FREQUENCY.get(settings);
             final var atomicRegister = new AtomicRegister(currentTermRef, isDisruptedSupplier);
             final var atomicRegister = new AtomicRegister(currentTermRef, isDisruptedSupplier);
-            final var atomicHeartbeat = new StoreHeartbeatService(
-                new SharedHeartbeatStore(heartBeatRef, isDisruptedSupplier),
+            final var atomicHeartbeat = new StoreHeartbeatService(new DisruptibleHeartbeatStore(new SharedHeartbeatStore(heartBeatRef)) {
+                @Override
+                protected boolean isDisrupted() {
+                    return isDisruptedSupplier.getAsBoolean();
+                }
+            },
                 threadPool,
                 threadPool,
                 heartbeatFrequency,
                 heartbeatFrequency,
                 TimeValue.timeValueMillis(heartbeatFrequency.millis() * MAX_MISSED_HEARTBEATS.get(settings)),
                 TimeValue.timeValueMillis(heartbeatFrequency.millis() * MAX_MISSED_HEARTBEATS.get(settings)),
@@ -329,27 +334,19 @@ public class AtomicRegisterCoordinatorTests extends CoordinatorTests {
     private static class SharedHeartbeatStore implements HeartbeatStore {
     private static class SharedHeartbeatStore implements HeartbeatStore {
 
 
         private final AtomicReference<Heartbeat> hearbeatRef;
         private final AtomicReference<Heartbeat> hearbeatRef;
-        private final BooleanSupplier isDisruptedSupplier;
 
 
-        SharedHeartbeatStore(AtomicReference<Heartbeat> hearbeatRef, BooleanSupplier isDisruptedSupplier) {
+        SharedHeartbeatStore(AtomicReference<Heartbeat> hearbeatRef) {
             this.hearbeatRef = hearbeatRef;
             this.hearbeatRef = hearbeatRef;
-            this.isDisruptedSupplier = isDisruptedSupplier;
         }
         }
 
 
         @Override
         @Override
         public void writeHeartbeat(Heartbeat newHeartbeat, ActionListener<Void> listener) {
         public void writeHeartbeat(Heartbeat newHeartbeat, ActionListener<Void> listener) {
-            if (isDisruptedSupplier.getAsBoolean()) {
-                listener.onFailure(new IOException("simulating disrupted access to shared store"));
-            }
             hearbeatRef.set(newHeartbeat);
             hearbeatRef.set(newHeartbeat);
             listener.onResponse(null);
             listener.onResponse(null);
         }
         }
 
 
         @Override
         @Override
         public void readLatestHeartbeat(ActionListener<Heartbeat> listener) {
         public void readLatestHeartbeat(ActionListener<Heartbeat> listener) {
-            if (isDisruptedSupplier.getAsBoolean()) {
-                listener.onFailure(new IOException("simulating disrupted access to shared store"));
-            }
             listener.onResponse(hearbeatRef.get());
             listener.onResponse(hearbeatRef.get());
         }
         }
     }
     }
@@ -363,15 +360,19 @@ public class AtomicRegisterCoordinatorTests extends CoordinatorTests {
             this.isDisruptedSupplier = isDisruptedSupplier;
             this.isDisruptedSupplier = isDisruptedSupplier;
         }
         }
 
 
+        private boolean isDisrupted() {
+            return isDisruptedSupplier.getAsBoolean();
+        }
+
         long readCurrentTerm() throws IOException {
         long readCurrentTerm() throws IOException {
-            if (isDisruptedSupplier.getAsBoolean()) {
+            if (isDisrupted()) {
                 throw new IOException("simulating disrupted access to shared store");
                 throw new IOException("simulating disrupted access to shared store");
             }
             }
             return currentTermRef.get();
             return currentTermRef.get();
         }
         }
 
 
         long compareAndExchange(long expected, long updated) throws IOException {
         long compareAndExchange(long expected, long updated) throws IOException {
-            if (isDisruptedSupplier.getAsBoolean()) {
+            if (isDisrupted()) {
                 throw new IOException("simulating disrupted access to shared store");
                 throw new IOException("simulating disrupted access to shared store");
             }
             }
             return currentTermRef.compareAndExchange(expected, updated);
             return currentTermRef.compareAndExchange(expected, updated);

+ 41 - 0
test/framework/src/main/java/org/elasticsearch/cluster/coordination/stateless/DisruptibleHeartbeatStore.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.coordination.stateless;
+
+import org.elasticsearch.action.ActionListener;
+
+import java.io.IOException;
+
+public abstract class DisruptibleHeartbeatStore implements HeartbeatStore {
+    private final HeartbeatStore delegate;
+
+    protected DisruptibleHeartbeatStore(HeartbeatStore delegate) {
+        this.delegate = delegate;
+    }
+
+    protected abstract boolean isDisrupted();
+
+    @Override
+    public final void writeHeartbeat(Heartbeat newHeartbeat, ActionListener<Void> listener) {
+        if (isDisrupted()) {
+            listener.onFailure(new IOException("simulating disrupted access to shared store"));
+        } else {
+            delegate.writeHeartbeat(newHeartbeat, listener);
+        }
+    }
+
+    @Override
+    public final void readLatestHeartbeat(ActionListener<Heartbeat> listener) {
+        if (isDisrupted()) {
+            listener.onFailure(new IOException("simulating disrupted access to shared store"));
+        } else {
+            delegate.readLatestHeartbeat(listener);
+        }
+    }
+}