|
@@ -84,6 +84,7 @@ import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Locale;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -770,18 +771,13 @@ public class PersistedClusterStateService {
|
|
final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get();
|
|
final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get();
|
|
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
|
|
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
|
|
logger.warn(
|
|
logger.warn(
|
|
- "writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
|
|
|
|
- + "wrote full state with [{}] indices",
|
|
|
|
|
|
+ "writing full cluster state took [{}ms] which is above the warn threshold of [{}]; {}",
|
|
durationMillis,
|
|
durationMillis,
|
|
finalSlowWriteLoggingThreshold,
|
|
finalSlowWriteLoggingThreshold,
|
|
- stats.numIndicesUpdated
|
|
|
|
|
|
+ stats
|
|
);
|
|
);
|
|
} else {
|
|
} else {
|
|
- logger.debug(
|
|
|
|
- "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices",
|
|
|
|
- durationMillis,
|
|
|
|
- stats.numIndicesUpdated
|
|
|
|
- );
|
|
|
|
|
|
+ logger.debug("writing full cluster state took [{}ms]; {}", durationMillis, stats);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
closeIfAnyIndexWriterHasTragedyOrIsClosed();
|
|
closeIfAnyIndexWriterHasTragedyOrIsClosed();
|
|
@@ -809,23 +805,13 @@ public class PersistedClusterStateService {
|
|
final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get();
|
|
final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get();
|
|
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
|
|
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
|
|
logger.warn(
|
|
logger.warn(
|
|
- "writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
|
|
|
|
- + "wrote global metadata [{}] and metadata for [{}] indices and skipped [{}] unchanged indices",
|
|
|
|
|
|
+ "writing cluster state took [{}ms] which is above the warn threshold of [{}]; {}",
|
|
durationMillis,
|
|
durationMillis,
|
|
finalSlowWriteLoggingThreshold,
|
|
finalSlowWriteLoggingThreshold,
|
|
- stats.globalMetaUpdated,
|
|
|
|
- stats.numIndicesUpdated,
|
|
|
|
- stats.numIndicesUnchanged
|
|
|
|
|
|
+ stats
|
|
);
|
|
);
|
|
} else {
|
|
} else {
|
|
- logger.debug(
|
|
|
|
- "writing cluster state took [{}ms]; "
|
|
|
|
- + "wrote global metadata [{}] and metadata for [{}] indices and skipped [{}] unchanged indices",
|
|
|
|
- durationMillis,
|
|
|
|
- stats.globalMetaUpdated,
|
|
|
|
- stats.numIndicesUpdated,
|
|
|
|
- stats.numIndicesUnchanged
|
|
|
|
- );
|
|
|
|
|
|
+ logger.debug("writing cluster state took [{}ms]; {}", durationMillis, stats);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
closeIfAnyIndexWriterHasTragedyOrIsClosed();
|
|
closeIfAnyIndexWriterHasTragedyOrIsClosed();
|
|
@@ -864,7 +850,9 @@ public class PersistedClusterStateService {
|
|
assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue;
|
|
assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ int numIndicesAdded = 0;
|
|
int numIndicesUpdated = 0;
|
|
int numIndicesUpdated = 0;
|
|
|
|
+ int numIndicesRemoved = 0;
|
|
int numIndicesUnchanged = 0;
|
|
int numIndicesUnchanged = 0;
|
|
for (IndexMetadata indexMetadata : metadata.indices().values()) {
|
|
for (IndexMetadata indexMetadata : metadata.indices().values()) {
|
|
final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID());
|
|
final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID());
|
|
@@ -875,7 +863,11 @@ public class PersistedClusterStateService {
|
|
previousVersion,
|
|
previousVersion,
|
|
indexMetadata.getVersion()
|
|
indexMetadata.getVersion()
|
|
);
|
|
);
|
|
- numIndicesUpdated++;
|
|
|
|
|
|
+ if (previousVersion == null) {
|
|
|
|
+ numIndicesAdded++;
|
|
|
|
+ } else {
|
|
|
|
+ numIndicesUpdated++;
|
|
|
|
+ }
|
|
|
|
|
|
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
|
|
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
|
|
metadataIndexWriter.deleteIndexMetadata(indexMetadata.getIndexUUID());
|
|
metadataIndexWriter.deleteIndexMetadata(indexMetadata.getIndexUUID());
|
|
@@ -891,6 +883,7 @@ public class PersistedClusterStateService {
|
|
|
|
|
|
for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) {
|
|
for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) {
|
|
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
|
|
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
|
|
|
|
+ numIndicesRemoved++;
|
|
metadataIndexWriter.deleteIndexMetadata(removedIndexUUID);
|
|
metadataIndexWriter.deleteIndexMetadata(removedIndexUUID);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -901,7 +894,7 @@ public class PersistedClusterStateService {
|
|
metadataIndexWriter.flush();
|
|
metadataIndexWriter.flush();
|
|
}
|
|
}
|
|
|
|
|
|
- return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged);
|
|
|
|
|
|
+ return new WriterStats(false, updateGlobalMeta, numIndicesUnchanged, numIndicesAdded, numIndicesUpdated, numIndicesRemoved);
|
|
}
|
|
}
|
|
|
|
|
|
private static int lastPageValue(boolean isLastPage) {
|
|
private static int lastPageValue(boolean isLastPage) {
|
|
@@ -977,7 +970,7 @@ public class PersistedClusterStateService {
|
|
metadataIndexWriter.flush();
|
|
metadataIndexWriter.flush();
|
|
}
|
|
}
|
|
|
|
|
|
- return new WriterStats(true, metadata.indices().size(), 0);
|
|
|
|
|
|
+ return new WriterStats(true, true, 0, 0, metadata.indices().size(), 0);
|
|
}
|
|
}
|
|
|
|
|
|
public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion, Version oldestIndexVersion)
|
|
public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion, Version oldestIndexVersion)
|
|
@@ -1056,15 +1049,31 @@ public class PersistedClusterStateService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- static class WriterStats {
|
|
|
|
- final boolean globalMetaUpdated;
|
|
|
|
- final long numIndicesUpdated;
|
|
|
|
- final long numIndicesUnchanged;
|
|
|
|
-
|
|
|
|
- WriterStats(boolean globalMetaUpdated, long numIndicesUpdated, long numIndicesUnchanged) {
|
|
|
|
- this.globalMetaUpdated = globalMetaUpdated;
|
|
|
|
- this.numIndicesUpdated = numIndicesUpdated;
|
|
|
|
- this.numIndicesUnchanged = numIndicesUnchanged;
|
|
|
|
|
|
+ private record WriterStats(
|
|
|
|
+ boolean isFullWrite,
|
|
|
|
+ boolean globalMetaUpdated,
|
|
|
|
+ int numIndicesUnchanged,
|
|
|
|
+ int numIndicesAdded,
|
|
|
|
+ int numIndicesUpdated,
|
|
|
|
+ int numIndicesRemoved
|
|
|
|
+ ) {
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ if (isFullWrite) {
|
|
|
|
+ return String.format(Locale.ROOT, "wrote global metadata and metadata for [%d] indices", numIndicesUpdated);
|
|
|
|
+ } else {
|
|
|
|
+ return String.format(
|
|
|
|
+ Locale.ROOT,
|
|
|
|
+ """
|
|
|
|
+ [%s] global metadata, wrote metadata for [%d] new indices and [%d] existing indices, \
|
|
|
|
+ removed metadata for [%d] indices and skipped [%d] unchanged indices""",
|
|
|
|
+ globalMetaUpdated ? "wrote" : "skipped writing",
|
|
|
|
+ numIndicesAdded,
|
|
|
|
+ numIndicesUpdated,
|
|
|
|
+ numIndicesRemoved,
|
|
|
|
+ numIndicesUnchanged
|
|
|
|
+ );
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|