Browse Source

Refactor translog reading and writing functions (#95110)

And a test function that is useful

Relates ES-5058
Iraklis Psaroudakis 2 years ago
parent
commit
313a72f141

+ 13 - 9
server/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -576,14 +576,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     public Location add(final Operation operation) throws IOException {
     public Location add(final Operation operation) throws IOException {
         final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
         final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
         try {
         try {
-            final long start = out.position();
-            out.skip(Integer.BYTES);
-            writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation);
-            final long end = out.position();
-            final int operationSize = (int) (end - Integer.BYTES - start);
-            out.seek(start);
-            out.writeInt(operationSize);
-            out.seek(end);
+            writeOperationWithSize(out, operation);
             final BytesReference bytes = out.bytes();
             final BytesReference bytes = out.bytes();
             try (ReleasableLock ignored = readLock.acquire()) {
             try (ReleasableLock ignored = readLock.acquire()) {
                 ensureOpen();
                 ensureOpen();
@@ -1525,7 +1518,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         return operations;
         return operations;
     }
     }
 
 
-    static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException {
+    public static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException {
         final Translog.Operation operation;
         final Translog.Operation operation;
         try {
         try {
             final int opSize = in.readInt();
             final int opSize = in.readInt();
@@ -1592,6 +1585,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         out.writeInt((int) checksum);
         out.writeInt((int) checksum);
     }
     }
 
 
+    public static void writeOperationWithSize(BytesStreamOutput out, Translog.Operation op) throws IOException {
+        final long start = out.position();
+        out.skip(Integer.BYTES);
+        writeOperationNoSize(new BufferedChecksumStreamOutput(out), op);
+        final long end = out.position();
+        final int operationSize = (int) (end - Integer.BYTES - start);
+        out.seek(start);
+        out.writeInt(operationSize);
+        out.seek(end);
+    }
+
     /**
     /**
      * Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if
      * Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if
      * there is no generation that could any such sequence number.
      * there is no generation that could any such sequence number.

+ 22 - 17
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -132,6 +132,7 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
         "index",
         "index",
         Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()
         Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()
     );
     );
+    private static final BytesArray TRANSLOG_OPERATION_SOURCE = new BytesArray("{}".getBytes(StandardCharsets.UTF_8));
     private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1);
     private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1);
     private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
     private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
     private final RecoveryPlannerService recoveryPlannerService = PeerOnlyRecoveryPlannerService.INSTANCE;
     private final RecoveryPlannerService recoveryPlannerService = PeerOnlyRecoveryPlannerService.INSTANCE;
@@ -1955,29 +1956,33 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
         };
         };
     }
     }
 
 
+    public static Translog.Operation generateOperation(long seqNo) {
+        final Translog.Operation op;
+        if (randomBoolean()) {
+            op = new Translog.Index(
+                "id",
+                seqNo,
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                TRANSLOG_OPERATION_SOURCE,
+                randomBoolean() ? randomAlphaOfLengthBetween(1, 5) : null,
+                randomNonNegativeLong()
+            );
+        } else if (randomBoolean()) {
+            op = new Translog.Delete("id", seqNo, randomNonNegativeLong(), randomNonNegativeLong());
+        } else {
+            op = new Translog.NoOp(seqNo, randomNonNegativeLong(), "test");
+        }
+        return op;
+    }
+
     private static List<Translog.Operation> generateOperations(int numOps) {
     private static List<Translog.Operation> generateOperations(int numOps) {
         final List<Translog.Operation> operations = new ArrayList<>(numOps);
         final List<Translog.Operation> operations = new ArrayList<>(numOps);
         final BytesArray source = new BytesArray("{}".getBytes(StandardCharsets.UTF_8));
         final BytesArray source = new BytesArray("{}".getBytes(StandardCharsets.UTF_8));
         final Set<Long> seqNos = new HashSet<>();
         final Set<Long> seqNos = new HashSet<>();
         for (int i = 0; i < numOps; i++) {
         for (int i = 0; i < numOps; i++) {
             final long seqNo = randomValueOtherThanMany(n -> seqNos.add(n) == false, ESTestCase::randomNonNegativeLong);
             final long seqNo = randomValueOtherThanMany(n -> seqNos.add(n) == false, ESTestCase::randomNonNegativeLong);
-            final Translog.Operation op;
-            if (randomBoolean()) {
-                op = new Translog.Index(
-                    "id",
-                    seqNo,
-                    randomNonNegativeLong(),
-                    randomNonNegativeLong(),
-                    source,
-                    randomBoolean() ? randomAlphaOfLengthBetween(1, 5) : null,
-                    randomNonNegativeLong()
-                );
-            } else if (randomBoolean()) {
-                op = new Translog.Delete("id", seqNo, randomNonNegativeLong(), randomNonNegativeLong());
-            } else {
-                op = new Translog.NoOp(seqNo, randomNonNegativeLong(), "test");
-            }
-            operations.add(op);
+            operations.add(generateOperation(seqNo));
         }
         }
         return operations;
         return operations;
     }
     }