|
@@ -19,11 +19,15 @@
|
|
|
|
|
|
package org.elasticsearch.test.disruption;
|
|
|
|
|
|
+import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.SuppressForbidden;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.test.InternalTestCluster;
|
|
|
|
|
|
+import java.lang.management.ManagementFactory;
|
|
|
+import java.lang.management.ThreadInfo;
|
|
|
+import java.lang.management.ThreadMXBean;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
@@ -41,11 +45,16 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
// logging has shared JVM locks - we may suspend a thread and block other nodes from doing their thing
|
|
|
Pattern.compile("logging\\.log4j"),
|
|
|
// security manager is shared across all nodes AND it uses synced hashmaps interanlly
|
|
|
- Pattern.compile("java\\.lang\\.SecurityManager")
|
|
|
+ Pattern.compile("java\\.lang\\.SecurityManager"),
|
|
|
+ // SecureRandom instance from SecureRandomHolder class is shared by all nodes
|
|
|
+ Pattern.compile("java\\.security\\.SecureRandom")
|
|
|
};
|
|
|
|
|
|
+ private static final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
|
|
|
+
|
|
|
protected final String disruptedNode;
|
|
|
private Set<Thread> suspendedThreads;
|
|
|
+ private Thread blockDetectionThread;
|
|
|
|
|
|
public LongGCDisruption(Random random, String disruptedNode) {
|
|
|
super(random);
|
|
@@ -60,7 +69,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
suspendedThreads = ConcurrentHashMap.newKeySet();
|
|
|
|
|
|
final String currentThreadName = Thread.currentThread().getName();
|
|
|
- assert currentThreadName.contains("[" + disruptedNode + "]") == false :
|
|
|
+ assert isDisruptedNodeThread(currentThreadName) == false :
|
|
|
"current thread match pattern. thread name: " + currentThreadName + ", node: " + disruptedNode;
|
|
|
// we spawn a background thread to protect against deadlock which can happen
|
|
|
// if there are shared resources between caller thread and and suspended threads
|
|
@@ -75,7 +84,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
@Override
|
|
|
protected void doRun() throws Exception {
|
|
|
// keep trying to stop threads, until no new threads are discovered.
|
|
|
- while (stopNodeThreads(disruptedNode, suspendedThreads)) {
|
|
|
+ while (stopNodeThreads(suspendedThreads)) {
|
|
|
if (Thread.interrupted()) {
|
|
|
return;
|
|
|
}
|
|
@@ -95,13 +104,52 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
}
|
|
|
if (stoppingThread.isAlive()) {
|
|
|
logger.warn("failed to stop node [{}]'s threads within [{}] millis. Stopping thread stack trace:\n {}"
|
|
|
- , disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread));
|
|
|
+ , disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread.getStackTrace()));
|
|
|
stoppingThread.interrupt(); // best effort;
|
|
|
throw new RuntimeException("stopping node threads took too long");
|
|
|
}
|
|
|
+ // block detection checks if other threads are blocked waiting on an object that is held by one
|
|
|
+ // of the threads that was suspended
|
|
|
+ if (isBlockDetectionSupported()) {
|
|
|
+ blockDetectionThread = new Thread(new AbstractRunnable() {
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ if (e instanceof InterruptedException == false) {
|
|
|
+ throw new AssertionError("unexpected exception in blockDetectionThread", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void doRun() throws Exception {
|
|
|
+ while (Thread.currentThread().isInterrupted() == false) {
|
|
|
+ ThreadInfo[] threadInfos = threadBean.dumpAllThreads(true, true);
|
|
|
+ for (ThreadInfo threadInfo : threadInfos) {
|
|
|
+ if (isDisruptedNodeThread(threadInfo.getThreadName()) == false &&
|
|
|
+ threadInfo.getLockOwnerName() != null &&
|
|
|
+ isDisruptedNodeThread(threadInfo.getLockOwnerName())) {
|
|
|
+
|
|
|
+ // find ThreadInfo object of the blocking thread (if available)
|
|
|
+ ThreadInfo blockingThreadInfo = null;
|
|
|
+ for (ThreadInfo otherThreadInfo : threadInfos) {
|
|
|
+ if (otherThreadInfo.getThreadId() == threadInfo.getLockOwnerId()) {
|
|
|
+ blockingThreadInfo = otherThreadInfo;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ onBlockDetected(threadInfo, blockingThreadInfo);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Thread.sleep(getBlockDetectionIntervalInMillis());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ blockDetectionThread.setName(currentThreadName + "[LongGCDisruption][blockDetection]");
|
|
|
+ blockDetectionThread.start();
|
|
|
+ }
|
|
|
success = true;
|
|
|
} finally {
|
|
|
if (success == false) {
|
|
|
+ stopBlockDetection();
|
|
|
// resume threads if failed
|
|
|
resumeThreads(suspendedThreads);
|
|
|
suspendedThreads = null;
|
|
@@ -112,18 +160,35 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private String stackTrace(Thread thread) {
|
|
|
- return Arrays.stream(thread.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n"));
|
|
|
+ public boolean isDisruptedNodeThread(String threadName) {
|
|
|
+ return threadName.contains("[" + disruptedNode + "]");
|
|
|
+ }
|
|
|
+
|
|
|
+ private String stackTrace(StackTraceElement[] stackTraceElements) {
|
|
|
+ return Arrays.stream(stackTraceElements).map(Object::toString).collect(Collectors.joining("\n"));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public synchronized void stopDisrupting() {
|
|
|
+ stopBlockDetection();
|
|
|
if (suspendedThreads != null) {
|
|
|
resumeThreads(suspendedThreads);
|
|
|
suspendedThreads = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void stopBlockDetection() {
|
|
|
+ if (blockDetectionThread != null) {
|
|
|
+ try {
|
|
|
+ blockDetectionThread.interrupt(); // best effort
|
|
|
+ blockDetectionThread.join(getStoppingTimeoutInMillis());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ blockDetectionThread = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
|
|
|
removeFromCluster(cluster);
|
|
@@ -144,7 +209,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
*/
|
|
|
@SuppressWarnings("deprecation") // stops/resumes threads intentionally
|
|
|
@SuppressForbidden(reason = "stops/resumes threads intentionally")
|
|
|
- protected boolean stopNodeThreads(String node, Set<Thread> nodeThreads) {
|
|
|
+ protected boolean stopNodeThreads(Set<Thread> nodeThreads) {
|
|
|
Thread[] allThreads = null;
|
|
|
while (allThreads == null) {
|
|
|
allThreads = new Thread[Thread.activeCount()];
|
|
@@ -154,16 +219,15 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
}
|
|
|
}
|
|
|
boolean liveThreadsFound = false;
|
|
|
- final String nodeThreadNamePart = "[" + node + "]";
|
|
|
for (Thread thread : allThreads) {
|
|
|
if (thread == null) {
|
|
|
continue;
|
|
|
}
|
|
|
- String name = thread.getName();
|
|
|
- if (name.contains(nodeThreadNamePart)) {
|
|
|
+ String threadName = thread.getName();
|
|
|
+ if (isDisruptedNodeThread(threadName)) {
|
|
|
if (thread.isAlive() && nodeThreads.add(thread)) {
|
|
|
liveThreadsFound = true;
|
|
|
- logger.trace("stopping thread [{}]", name);
|
|
|
+ logger.trace("stopping thread [{}]", threadName);
|
|
|
thread.suspend();
|
|
|
// double check the thread is not in a shared resource like logging. If so, let it go and come back..
|
|
|
boolean safe = true;
|
|
@@ -178,7 +242,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
}
|
|
|
}
|
|
|
if (!safe) {
|
|
|
- logger.trace("resuming thread [{}] as it is in a critical section", name);
|
|
|
+ logger.trace("resuming thread [{}] as it is in a critical section", threadName);
|
|
|
thread.resume();
|
|
|
nodeThreads.remove(thread);
|
|
|
}
|
|
@@ -198,6 +262,28 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|
|
return TimeValue.timeValueSeconds(30).getMillis();
|
|
|
}
|
|
|
|
|
|
+ public boolean isBlockDetectionSupported() {
|
|
|
+ return threadBean.isObjectMonitorUsageSupported() && threadBean.isSynchronizerUsageSupported();
|
|
|
+ }
|
|
|
+
|
|
|
+ // for testing
|
|
|
+ protected long getBlockDetectionIntervalInMillis() {
|
|
|
+ return 3000L;
|
|
|
+ }
|
|
|
+
|
|
|
+ // for testing
|
|
|
+ protected void onBlockDetected(ThreadInfo blockedThread, @Nullable ThreadInfo blockingThread) {
|
|
|
+ String blockedThreadStackTrace = stackTrace(blockedThread.getStackTrace());
|
|
|
+ String blockingThreadStackTrace = blockingThread != null ?
|
|
|
+ stackTrace(blockingThread.getStackTrace()) : "not available";
|
|
|
+ throw new AssertionError("Thread [" + blockedThread.getThreadName() + "] is blocked waiting on the resource [" +
|
|
|
+ blockedThread.getLockInfo() + "] held by the suspended thread [" + blockedThread.getLockOwnerName() +
|
|
|
+ "] of the disrupted node [" + disruptedNode + "].\n" +
|
|
|
+ "Please add this occurrence to the unsafeClasses list in [" + LongGCDisruption.class.getName() + "].\n" +
|
|
|
+ "Stack trace of blocked thread: " + blockedThreadStackTrace + "\n" +
|
|
|
+ "Stack trace of blocking thread: " + blockingThreadStackTrace);
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("deprecation") // stops/resumes threads intentionally
|
|
|
@SuppressForbidden(reason = "stops/resumes threads intentionally")
|
|
|
protected void resumeThreads(Set<Thread> threads) {
|