Sfoglia il codice sorgente

Improve stability of UpdateIT

With this commit, we reduce the amount of work
that UpdateIT does and add progress logging.

Closes #14877
Daniel Mitterdorfer 10 anni fa
parent
commit
b804a0c4a6

+ 2 - 0
core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

@@ -190,6 +190,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
                         e = ExceptionsHelper.unwrapCause(e);
                         if (e instanceof VersionConflictEngineException) {
                             if (retryCount < request.retryOnConflict()) {
+                                logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
+                                        retryCount + 1, request.retryOnConflict(), request.index(), request.shardId(), request.id());
                                 threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
                                     @Override
                                     protected void doRun() {

+ 14 - 3
core/src/test/java/org/elasticsearch/update/UpdateIT.java

@@ -936,16 +936,19 @@ public class UpdateIT extends ESIntegTestCase {
         int numberOfThreads = scaledRandomIntBetween(2,5);
         final CountDownLatch latch = new CountDownLatch(numberOfThreads);
         final CountDownLatch startLatch = new CountDownLatch(1);
-        final int numberOfUpdatesPerThread = scaledRandomIntBetween(100, 10000);
+        final int numberOfUpdatesPerThread = scaledRandomIntBetween(100, 500);
         final List<Throwable> failures = new CopyOnWriteArrayList<>();
+
         for (int i = 0; i < numberOfThreads; i++) {
             Runnable r = new Runnable() {
-
                 @Override
                 public void run() {
                     try {
                         startLatch.await();
                         for (int i = 0; i < numberOfUpdatesPerThread; i++) {
+                            if (i % 100 == 0) {
+                                logger.debug("Client [{}] issued [{}] of [{}] requests", Thread.currentThread().getName(), i, numberOfUpdatesPerThread);
+                            }
                             if (useBulkApi) {
                                 UpdateRequestBuilder updateRequestBuilder = client().prepareUpdate(indexOrAlias(), "type1", Integer.toString(i))
                                         .setScript(new Script("field", ScriptService.ScriptType.INLINE, "field_inc", null))
@@ -960,6 +963,12 @@ public class UpdateIT extends ESIntegTestCase {
                                         .execute().actionGet();
                             }
                         }
+                        logger.info("Client [{}] issued all [{}] requests.", Thread.currentThread().getName(), numberOfUpdatesPerThread);
+                    } catch (InterruptedException e) {
+                        // test infrastructure kills long-running tests by interrupting them, thus we handle this case separately
+                        logger.warn("Test was forcefully stopped. Client [{}] may still have outstanding requests.", Thread.currentThread().getName());
+                        failures.add(e);
+                        Thread.currentThread().interrupt();
                     } catch (Throwable e) {
                         failures.add(e);
                     } finally {
@@ -968,7 +977,9 @@ public class UpdateIT extends ESIntegTestCase {
                 }
 
             };
-            new Thread(r).start();
+            Thread updater = new Thread(r);
+            updater.setName("UpdateIT-Client-" + i);
+            updater.start();
         }
         startLatch.countDown();
         latch.await();