|
|
@@ -33,6 +33,7 @@ import org.elasticsearch.action.update.UpdateRequest;
|
|
|
import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
import org.elasticsearch.action.update.UpdateRequestBuilder;
|
|
|
import org.elasticsearch.action.update.UpdateResponse;
|
|
|
+import org.elasticsearch.client.transport.NoNodeAvailableException;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
@@ -570,7 +571,7 @@ public class UpdateTests extends ElasticsearchIntegrationTest {
|
|
|
|
|
|
final int numberOfThreads = scaledRandomIntBetween(3,5);
|
|
|
final int numberOfIdsPerThread = scaledRandomIntBetween(3,10);
|
|
|
- final int numberOfUpdatesPerId = scaledRandomIntBetween(100,200);
|
|
|
+ final int numberOfUpdatesPerId = scaledRandomIntBetween(10,100);
|
|
|
final int retryOnConflict = randomIntBetween(0,1);
|
|
|
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
|
|
|
final CountDownLatch startLatch = new CountDownLatch(1);
|
|
|
@@ -637,22 +638,49 @@ public class UpdateTests extends ElasticsearchIntegrationTest {
|
|
|
public void run(){
|
|
|
try {
|
|
|
startLatch.await();
|
|
|
+ boolean hasWaitedForNoNode = false;
|
|
|
for (int j = 0; j < numberOfIds; j++) {
|
|
|
for (int k = 0; k < numberOfUpdatesPerId; ++k) {
|
|
|
updateRequestsOutstanding.acquire();
|
|
|
- UpdateRequest ur = client().prepareUpdate("test", "type1", Integer.toString(j))
|
|
|
- .setScript("ctx._source.field += 1", ScriptService.ScriptType.INLINE)
|
|
|
- .setRetryOnConflict(retryOnConflict)
|
|
|
- .setUpsert(jsonBuilder().startObject().field("field", 1).endObject())
|
|
|
- .setListenerThreaded(false)
|
|
|
- .request();
|
|
|
- client().update(ur, new UpdateListener(j) );
|
|
|
-
|
|
|
- deleteRequestsOutstanding.acquire();
|
|
|
- DeleteRequest dr = client().prepareDelete("test", "type1", Integer.toString(j))
|
|
|
- .setListenerThreaded(false)
|
|
|
- .setOperationThreaded(false).request();
|
|
|
- client().delete(dr, new DeleteListener(j));
|
|
|
+ try {
|
|
|
+ UpdateRequest ur = client().prepareUpdate("test", "type1", Integer.toString(j))
|
|
|
+ .setScript("ctx._source.field += 1", ScriptService.ScriptType.INLINE)
|
|
|
+ .setRetryOnConflict(retryOnConflict)
|
|
|
+ .setUpsert(jsonBuilder().startObject().field("field", 1).endObject())
|
|
|
+ .setListenerThreaded(false)
|
|
|
+ .request();
|
|
|
+ client().update(ur, new UpdateListener(j));
|
|
|
+ } catch (NoNodeAvailableException nne) {
|
|
|
+ updateRequestsOutstanding.release();
|
|
|
+ synchronized (failedMap) {
|
|
|
+ incrementMapValue(j, failedMap);
|
|
|
+ }
|
|
|
+ if (hasWaitedForNoNode) {
|
|
|
+ throw nne;
|
|
|
+ }
|
|
|
+ logger.warn("Got NoNodeException waiting for 1 second for things to recover.");
|
|
|
+ hasWaitedForNoNode = true;
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ deleteRequestsOutstanding.acquire();
|
|
|
+ DeleteRequest dr = client().prepareDelete("test", "type1", Integer.toString(j))
|
|
|
+ .setListenerThreaded(false)
|
|
|
+ .setOperationThreaded(false).request();
|
|
|
+ client().delete(dr, new DeleteListener(j));
|
|
|
+ } catch (NoNodeAvailableException nne) {
|
|
|
+ deleteRequestsOutstanding.release();
|
|
|
+ synchronized (failedMap) {
|
|
|
+ incrementMapValue(j, failedMap);
|
|
|
+ }
|
|
|
+ if (hasWaitedForNoNode) {
|
|
|
+ throw nne;
|
|
|
+ }
|
|
|
+ logger.warn("Got NoNodeException waiting for 1 second for things to recover.");
|
|
|
+ hasWaitedForNoNode = true;
|
|
|
+ Thread.sleep(1000); //Wait for no-node to clear
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
} catch (Throwable e) {
|