|
@@ -38,6 +38,8 @@ import org.elasticsearch.discovery.AbstractDisruptionTestCase;
|
|
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
|
|
+import org.elasticsearch.threadpool.Scheduler;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
@@ -50,8 +52,10 @@ import java.util.Optional;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
@@ -437,13 +441,24 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public boolean isLinearizable() {
|
|
|
+ public void assertLinearizable() {
|
|
|
logger.info("--> Linearizability checking history of size: {} for key: {} and initialVersion: {}: {}", history.size(),
|
|
|
id, initialVersion, history);
|
|
|
LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion);
|
|
|
boolean linearizable = false;
|
|
|
try {
|
|
|
- linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator());
|
|
|
+ final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
|
|
|
+ final AtomicBoolean abort = new AtomicBoolean();
|
|
|
+ // Large histories can be problematic and have the linearizability checker run OOM
|
|
|
+ // Bound the time how long the checker can run on such histories (Values empirically determined)
|
|
|
+ if (history.size() > 300) {
|
|
|
+ scheduler.schedule(() -> abort.set(true), 10, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+ linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator(), abort::get);
|
|
|
+ ThreadPool.terminate(scheduler, 1, TimeUnit.SECONDS);
|
|
|
+ if (abort.get() && linearizable == false) {
|
|
|
+ linearizable = true; // let the test pass
|
|
|
+ }
|
|
|
} finally {
|
|
|
// implicitly test that we can serialize all histories.
|
|
|
String serializedHistory = base64Serialize(history);
|
|
@@ -453,11 +468,7 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase {
|
|
|
spec, initialVersion, serializedHistory);
|
|
|
}
|
|
|
}
|
|
|
- return linearizable;
|
|
|
- }
|
|
|
-
|
|
|
- public void assertLinearizable() {
|
|
|
- assertTrue("Must be linearizable", isLinearizable());
|
|
|
+ assertTrue("Must be linearizable", linearizable);
|
|
|
}
|
|
|
}
|
|
|
|