Browse Source

Fix split package with OneMergeHelper (#78601)

With LUCENE-10118 integrated, we can now remove the package
-private dependency with org.apache.lucene.index.OneMergeHelper, and
intercept the info/log messages coming from merge threads.

This change alters the logging a little, but the fundamental information
captured remains more or less the same. It is worth noting that since
the merges occur asynconously, the actual post-merge statistics are best
captured when the merge thread completes its operation - which is the
case with the change in this PR.

relates (#78166)
Chris Hegarty 4 years ago
parent
commit
0ca3d12330

+ 0 - 1
server/build.gradle

@@ -280,7 +280,6 @@ tasks.named('splitPackagesAudit').configure {
     // These are tricky because Lucene itself splits the index package,
     // but this should be fixed in Lucene 9
     'org.apache.lucene.index.LazySoftDeletesDirectoryReaderWrapper',
-    'org.apache.lucene.index.OneMergeHelper',
     'org.apache.lucene.index.ShuffleForcedMergePolicy',
 
     // Joda should own its own packages! This should be a simple move.

+ 0 - 50
server/src/main/java/org/apache/lucene/index/OneMergeHelper.java

@@ -1,50 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.apache.lucene.index;
-
-import java.io.IOException;
-
-/**
- * Allows pkg private access
- */
-public class OneMergeHelper {
-    private OneMergeHelper() {}
-    public static String getSegmentName(MergePolicy.OneMerge merge) {
-        return merge.info != null ? merge.info.info.name : "_na_";
-    }
-
-    /**
-     * The current MB per second rate limit for this merge.
-     **/
-    public static double getMbPerSec(Thread thread, MergePolicy.OneMerge merge) {
-        if (thread instanceof ConcurrentMergeScheduler.MergeThread) {
-            return ((ConcurrentMergeScheduler.MergeThread) thread).rateLimiter.getMBPerSec();
-        }
-        assert false: "this is not merge thread";
-        return Double.POSITIVE_INFINITY;
-    }
-
-    /**
-     * Returns total bytes written by this merge.
-     **/
-    public static long getTotalBytesWritten(Thread thread,
-                                            MergePolicy.OneMerge merge) throws IOException {
-        /**
-         * TODO: The number of bytes written during the merge should be accessible in OneMerge.
-         */
-        if (thread instanceof ConcurrentMergeScheduler.MergeThread) {
-            return ((ConcurrentMergeScheduler.MergeThread) thread).rateLimiter
-                .getTotalBytesWritten();
-        }
-        assert false: "this is not merge thread";
-        return merge.totalBytesSize();
-    }
-
-
-}

+ 29 - 11
server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java

@@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.ConcurrentMergeScheduler;
 import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.MergeScheduler;
-import org.apache.lucene.index.OneMergeHelper;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.metrics.MeanMetric;
@@ -67,6 +66,31 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
         return readOnlyOnGoingMerges;
     }
 
+    /** We're currently only interested in messages with this prefix. */
+    private static final String MERGE_THREAD_MESSAGE_PREFIX = "merge thread";
+
+    @Override
+    /** Overridden to route specific MergeThread messages to our logger. */
+    protected boolean verbose() {
+        if (logger.isTraceEnabled() && Thread.currentThread() instanceof MergeThread) {
+            return true;
+        }
+        return super.verbose();
+    }
+
+    @Override
+    /** Overridden to route specific MergeThread messages to our logger. */
+    protected void message(String message) {
+        if (logger.isTraceEnabled() && Thread.currentThread() instanceof MergeThread && message.startsWith(MERGE_THREAD_MESSAGE_PREFIX)) {
+            logger.trace("{}", message);
+        }
+        super.message(message);
+    }
+
+    private static String getSegmentName(MergePolicy.OneMerge merge) {
+        return merge.getMergeInfo() != null ? merge.getMergeInfo().info.name : "_na_";
+    }
+
     @Override
     protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
         int totalNumDocs = merge.totalNumDocs();
@@ -81,7 +105,7 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
 
         if (logger.isTraceEnabled()) {
             logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size",
-                OneMergeHelper.getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes),
+                getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes),
                 new ByteSizeValue(merge.estimatedMergeBytes));
         }
         try {
@@ -106,23 +130,18 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
             long throttledMS = TimeValue.nsecToMSec(
                 merge.getMergeProgress().getPauseTimes().get(MergePolicy.OneMergeProgress.PauseReason.PAUSED)
             );
-            final Thread thread = Thread.currentThread();
-            long totalBytesWritten = OneMergeHelper.getTotalBytesWritten(thread, merge);
-            double mbPerSec = OneMergeHelper.getMbPerSec(thread, merge);
             totalMergeStoppedTime.inc(stoppedMS);
             totalMergeThrottledTime.inc(throttledMS);
 
             String message = String.format(Locale.ROOT,
                                            "merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], " +
-                                               "[%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]",
-                                           OneMergeHelper.getSegmentName(merge),
+                                               "[%s throttled]",
+                                           getSegmentName(merge),
                                            TimeValue.timeValueMillis(tookMS),
                                            totalSizeInBytes/1024f/1024f,
                                            totalNumDocs,
                                            TimeValue.timeValueMillis(stoppedMS),
-                                           TimeValue.timeValueMillis(throttledMS),
-                                           totalBytesWritten/1024f/1024f,
-                                           mbPerSec);
+                                           TimeValue.timeValueMillis(throttledMS));
 
             if (tookMS > 20000) { // if more than 20 seconds, DEBUG log it
                 logger.debug("{}", message);
@@ -184,5 +203,4 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
             disableAutoIOThrottle();
         }
     }
-
 }

+ 59 - 0
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -34,6 +34,7 @@ import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.LiveIndexWriterConfig;
 import org.apache.lucene.index.LogDocMergePolicy;
+import org.apache.lucene.index.LogMergePolicy;
 import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.NumericDocValues;
@@ -183,6 +184,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsInRelativeOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.emptyArray;
@@ -195,6 +197,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.matchesRegex;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -2174,6 +2177,62 @@ public class InternalEngineTests extends EngineTestCase {
         }
     }
 
+    private static class MockMTAppender extends AbstractAppender {
+        private final List<String> messages = Collections.synchronizedList(new ArrayList<>());
+
+        List<String> messages () { return messages; }
+
+        MockMTAppender(final String name) throws IllegalAccessException {
+            super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0],
+                false, null, null), null);
+        }
+
+        @Override
+        public void append(LogEvent event) {
+            final String formattedMessage = event.getMessage().getFormattedMessage();
+            if (event.getLevel() == Level.TRACE && formattedMessage.startsWith("merge thread")) {
+                messages.add(formattedMessage);
+            }
+        }
+    }
+
+    public void testMergeThreadLogging() throws IllegalAccessException, IOException {
+        MockMTAppender mockAppender = new MockMTAppender("testMergeThreadLogging");
+        mockAppender.start();
+
+        Logger rootLogger = LogManager.getRootLogger();
+        Level savedLevel = rootLogger.getLevel();
+        Loggers.addAppender(rootLogger, mockAppender);
+        Loggers.setLevel(rootLogger, Level.TRACE);
+
+        LogMergePolicy lmp = newLogMergePolicy();
+        lmp.setMergeFactor(2);
+        try (Store store = createStore()) {
+            InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), lmp); // fmp
+            engine.index(indexForDoc(testParsedDocument("1", null, testDocument(), B_1, null)));
+            engine.index(indexForDoc(testParsedDocument("2", null, testDocument(), B_1, null)));
+            engine.index(indexForDoc(testParsedDocument("3", null, testDocument(), B_1, null)));
+            engine.index(indexForDoc(testParsedDocument("4", null, testDocument(), B_1, null)));
+            engine.forceMerge(true, 1, false, UUIDs.randomBase64UUID());
+            engine.flushAndClose();
+
+            long merges = engine.getMergeStats().getTotal();
+            if (merges > 0) {
+                List<String> threadMsgs =
+                    mockAppender.messages().stream()
+                        .filter(line -> line.startsWith("merge thread"))
+                        .collect(Collectors.toList());
+                assertThat("messages:" + threadMsgs + ", merges=" + merges, threadMsgs.size(), greaterThanOrEqualTo(2));
+                assertThat(threadMsgs,
+                    containsInRelativeOrder(matchesRegex("^merge thread .* start$"), matchesRegex("^merge thread .* merge segment.*$")));
+            }
+        } finally {
+            Loggers.removeAppender(rootLogger, mockAppender);
+            mockAppender.stop();
+            Loggers.setLevel(rootLogger, savedLevel);
+        }
+    }
+
     public void testSeqNoAndCheckpoints() throws IOException, InterruptedException {
         final int opCount = randomIntBetween(1, 256);
         long primarySeqNo = SequenceNumbers.NO_OPS_PERFORMED;