ソースを参照

Merge pull request #18357 from mikemccand/imc_bytes_tracking

Fix concurrency bug in IMC that could cause it to check too infrequently
Michael McCandless 9 年 前
コミット
d865910c13

+ 16 - 8
core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

@@ -251,21 +251,29 @@ public class IndexingMemoryController extends AbstractComponent implements Index
         /** Shard calls this on each indexing/delete op */
         public void bytesWritten(int bytes) {
             long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
+            assert totalBytes >= 0;
             while (totalBytes > indexingBuffer.bytes()/30) {
+
                 if (runLock.tryLock()) {
                     try {
-                        bytesWrittenSinceCheck.addAndGet(-totalBytes);
-                        // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
-                        // typically smaller but can be larger in extreme cases (many unique terms).  This logic is here only as a safety against
-                        // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
-                        // processed by indexing:
-                        runUnlocked();
+                        // Must pull this again because it may have changed since we first checked:
+                        totalBytes = bytesWrittenSinceCheck.get();
+                        if (totalBytes > indexingBuffer.bytes()/30) {
+                            bytesWrittenSinceCheck.addAndGet(-totalBytes);
+                            // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
+                            // typically smaller but can be larger in extreme cases (many unique terms).  This logic is here only as a safety against
+                            // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
+                            // processed by indexing:
+                            runUnlocked();
+                        }
                     } finally {
                         runLock.unlock();
                     }
-                    // Could be while we were checking, more bytes arrived:
-                    totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
+
+                    // Must get it again since other threads could have increased it while we were in runUnlocked
+                    totalBytes = bytesWrittenSinceCheck.get();
                 } else {
+                    // Another thread beat us to it: let them do all the work, yay!
                     break;
                 }
             }