Browse Source

Dry up Translog.Operation classes (#95106)

Pulling some common logic into an abstract base class,
removing error-prone source getters from the implementations
without a source and simplifying serialization by implementing
Writeable.
Armin Braun 2 years ago
parent
commit
93479992ab

+ 1 - 1
server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java

@@ -65,7 +65,7 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
         super.writeTo(out);
         out.writeZLong(trimAboveSeqNo);
         out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
-        out.writeArray(Translog.Operation::writeOperation, operations);
+        out.writeArray(operations);
     }
 
     @Override

+ 65 - 107
server/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -1064,8 +1065,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * A generic interface representing an operation performed on the transaction log.
      * Each is associated with a type.
      */
-    public interface Operation {
-        enum Type {
+    public abstract static class Operation implements Writeable {
+        public enum Type {
             @Deprecated
             CREATE((byte) 1),
             INDEX((byte) 2),
@@ -1093,47 +1094,50 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             }
         }
 
-        Type opType();
+        protected final long seqNo;
 
-        long estimateSize();
+        protected final long primaryTerm;
 
-        BytesReference source();
+        protected Operation(long seqNo, long primaryTerm) {
+            this.seqNo = seqNo;
+            this.primaryTerm = primaryTerm;
+        }
+
+        public abstract Type opType();
 
-        long seqNo();
+        public abstract long estimateSize();
+
+        public final long seqNo() {
+            return seqNo;
+        }
 
-        long primaryTerm();
+        public final long primaryTerm() {
+            return primaryTerm;
+        }
 
         /**
-         * Reads the type and the operation from the given stream. The operation must be written with
-         * {@link Operation#writeOperation(StreamOutput, Operation)}
+         * Reads the type and the operation from the given stream.
          */
-        static Operation readOperation(final StreamInput input) throws IOException {
+        public static Operation readOperation(final StreamInput input) throws IOException {
             final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
             return switch (type) {
                 // the de-serialization logic in Index was identical to that of Create when create was deprecated
-                case CREATE, INDEX -> new Index(input);
-                case DELETE -> new Delete(input);
+                case CREATE, INDEX -> Index.readFrom(input);
+                case DELETE -> Delete.readFrom(input);
                 case NO_OP -> new NoOp(input);
             };
         }
 
-        /**
-         * Writes the type and translog operation to the given stream
-         */
-        static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
-            output.writeByte(operation.opType().id());
-            switch (operation.opType()) {
-                // the serialization logic in Index was identical to that of Create when create was deprecated
-                case CREATE, INDEX -> ((Index) operation).write(output);
-                case DELETE -> ((Delete) operation).write(output);
-                case NO_OP -> ((NoOp) operation).write(output);
-                default -> throw new AssertionError("no case for [" + operation.opType() + "]");
-            }
+        @Override
+        public final void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(opType().id());
+            writeBody(out);
         }
 
+        protected abstract void writeBody(StreamOutput out) throws IOException;
     }
 
-    public static class Index implements Operation {
+    public static class Index extends Operation {
 
         public static final int FORMAT_NO_PARENT = 9; // since 7.0
         public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1;
@@ -1142,39 +1146,40 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         private final String id;
         private final long autoGeneratedIdTimestamp;
-        private final long seqNo;
-        private final long primaryTerm;
         private final long version;
         private final BytesReference source;
         private final String routing;
 
-        private Index(final StreamInput in) throws IOException {
+        private static Index readFrom(StreamInput in) throws IOException {
             final int format = in.readVInt(); // SERIALIZATION_FORMAT
             assert format >= FORMAT_NO_PARENT : "format was: " + format;
-            id = in.readString();
+            String id = in.readString();
             if (format < FORMAT_NO_DOC_TYPE) {
                 in.readString();
                 // can't assert that this is _doc because pre-8.0 indexes can have any name for a type
             }
-            source = in.readBytesReference();
-            routing = in.readOptionalString();
-            this.version = in.readLong();
+            BytesReference source = in.readBytesReference();
+            String routing = in.readOptionalString();
+            long version = in.readLong();
             if (format < FORMAT_NO_VERSION_TYPE) {
                 in.readByte(); // _version_type
             }
-            this.autoGeneratedIdTimestamp = in.readLong();
-            seqNo = in.readLong();
-            primaryTerm = in.readLong();
+            long autoGeneratedIdTimestamp = in.readLong();
+            long seqNo = in.readLong();
+            long primaryTerm = in.readLong();
+            return new Index(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp);
         }
 
         public Index(Engine.Index index, Engine.IndexResult indexResult) {
-            this.id = index.id();
-            this.source = index.source();
-            this.routing = index.routing();
-            this.seqNo = indexResult.getSeqNo();
-            this.primaryTerm = index.primaryTerm();
-            this.version = indexResult.getVersion();
-            this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
+            this(
+                index.id(),
+                indexResult.getSeqNo(),
+                index.primaryTerm(),
+                indexResult.getVersion(),
+                index.source(),
+                index.routing(),
+                index.getAutoGeneratedIdTimestamp()
+            );
         }
 
         public Index(
@@ -1186,10 +1191,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             String routing,
             long autoGeneratedIdTimestamp
         ) {
+            super(seqNo, primaryTerm);
             this.id = id;
             this.source = source;
-            this.seqNo = seqNo;
-            this.primaryTerm = primaryTerm;
             this.version = version;
             this.routing = routing;
             this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
@@ -1216,26 +1220,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             return this.routing;
         }
 
-        @Override
         public BytesReference source() {
             return this.source;
         }
 
-        @Override
-        public long seqNo() {
-            return seqNo;
-        }
-
-        @Override
-        public long primaryTerm() {
-            return primaryTerm;
-        }
-
         public long version() {
             return this.version;
         }
 
-        private void write(final StreamOutput out) throws IOException {
+        @Override
+        public void writeBody(final StreamOutput out) throws IOException {
             final int format = out.getTransportVersion().onOrAfter(TransportVersion.V_8_0_0)
                 ? SERIALIZATION_FORMAT
                 : FORMAT_NO_VERSION_TYPE;
@@ -1309,7 +1303,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
     }
 
-    public static class Delete implements Operation {
+    public static class Delete extends Operation {
 
         private static final int FORMAT_6_0 = 4; // 6.0 - *
         public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0
@@ -1318,29 +1312,28 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         public static final int SERIALIZATION_FORMAT = FORMAT_NO_DOC_TYPE;
 
         private final String id;
-        private final long seqNo;
-        private final long primaryTerm;
         private final long version;
 
-        private Delete(final StreamInput in) throws IOException {
+        private static Delete readFrom(StreamInput in) throws IOException {
             final int format = in.readVInt();// SERIALIZATION_FORMAT
             assert format >= FORMAT_6_0 : "format was: " + format;
             if (format < FORMAT_NO_DOC_TYPE) {
                 in.readString();
                 // Can't assert that this is _doc because pre-8.0 indexes can have any name for a type
             }
-            id = in.readString();
+            String id = in.readString();
             if (format < FORMAT_NO_DOC_TYPE) {
                 final String docType = in.readString();
                 assert docType.equals(IdFieldMapper.NAME) : docType + " != " + IdFieldMapper.NAME;
                 in.readBytesRef(); // uid
             }
-            this.version = in.readLong();
+            long version = in.readLong();
             if (format < FORMAT_NO_VERSION_TYPE) {
                 in.readByte(); // versionType
             }
-            seqNo = in.readLong();
-            primaryTerm = in.readLong();
+            long seqNo = in.readLong();
+            long primaryTerm = in.readLong();
+            return new Delete(id, seqNo, primaryTerm, version);
         }
 
         public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
@@ -1353,9 +1346,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         }
 
         public Delete(String id, long seqNo, long primaryTerm, long version) {
+            super(seqNo, primaryTerm);
             this.id = Objects.requireNonNull(id);
-            this.seqNo = seqNo;
-            this.primaryTerm = primaryTerm;
             this.version = version;
         }
 
@@ -1373,26 +1365,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             return id;
         }
 
-        @Override
-        public long seqNo() {
-            return seqNo;
-        }
-
-        @Override
-        public long primaryTerm() {
-            return primaryTerm;
-        }
-
         public long version() {
             return this.version;
         }
 
         @Override
-        public BytesReference source() {
-            throw new IllegalStateException("trying to read doc source from delete operation");
-        }
-
-        private void write(final StreamOutput out) throws IOException {
+        public void writeBody(final StreamOutput out) throws IOException {
             final int format = out.getTransportVersion().onOrAfter(TransportVersion.V_8_0_0)
                 ? SERIALIZATION_FORMAT
                 : FORMAT_NO_VERSION_TYPE;
@@ -1439,42 +1417,27 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         }
     }
 
-    public static class NoOp implements Operation {
-
-        private final long seqNo;
-        private final long primaryTerm;
+    public static class NoOp extends Operation {
         private final String reason;
 
-        @Override
-        public long seqNo() {
-            return seqNo;
-        }
-
-        @Override
-        public long primaryTerm() {
-            return primaryTerm;
-        }
-
         public String reason() {
             return reason;
         }
 
         private NoOp(final StreamInput in) throws IOException {
-            seqNo = in.readLong();
-            primaryTerm = in.readLong();
-            reason = in.readString();
+            this(in.readLong(), in.readLong(), in.readString());
         }
 
         public NoOp(final long seqNo, final long primaryTerm, final String reason) {
+            super(seqNo, primaryTerm);
             assert seqNo > SequenceNumbers.NO_OPS_PERFORMED;
             assert primaryTerm >= 0;
             assert reason != null;
-            this.seqNo = seqNo;
-            this.primaryTerm = primaryTerm;
             this.reason = reason;
         }
 
-        private void write(final StreamOutput out) throws IOException {
+        @Override
+        public void writeBody(final StreamOutput out) throws IOException {
             out.writeLong(seqNo);
             out.writeLong(primaryTerm);
             out.writeString(reason);
@@ -1490,11 +1453,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             return 2 * reason.length() + 2 * Long.BYTES;
         }
 
-        @Override
-        public BytesReference source() {
-            throw new UnsupportedOperationException("source does not exist for a no-op");
-        }
-
         @Override
         public boolean equals(Object obj) {
             if (this == obj) {
@@ -1629,7 +1587,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         // because closing it closes the underlying stream, which we don't
         // want to do here.
         out.resetDigest();
-        Translog.Operation.writeOperation(out, op);
+        op.writeTo(out);
         long checksum = out.getChecksum();
         out.writeInt((int) checksum);
     }

+ 1 - 1
server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

@@ -392,7 +392,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
                     translogOperations++;
                     assertThat("unexpected op: " + next, (int) next.seqNo(), lessThan(initialDocs + extraDocs));
                     assertThat("unexpected primaryTerm: " + next.primaryTerm(), next.primaryTerm(), is(oldPrimary.getPendingPrimaryTerm()));
-                    assertThat(next.source().utf8ToString(), is("{ \"f\": \"normal\"}"));
+                    assertThat(((Translog.Index) next).source().utf8ToString(), is("{ \"f\": \"normal\"}"));
                 }
             }
             assertThat(translogOperations, either(equalTo(initialDocs + extraDocs)).or(equalTo(task.getResyncedOperations())));

+ 25 - 17
server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -1250,7 +1250,7 @@ public class TranslogTests extends ESTestCase {
                 maxOp = next;
             }
             assertNotNull(maxOp);
-            assertEquals(maxOp.source().utf8ToString(), Integer.toString(count));
+            assertEquals(((Translog.Index) maxOp).source().utf8ToString(), Integer.toString(count));
         }
     }
 
@@ -1293,7 +1293,7 @@ public class TranslogTests extends ESTestCase {
             for (int op = 0; op < translogOperations; op++) {
                 if (op <= lastSynced) {
                     final Translog.Operation read = snapshot.next();
-                    assertEquals(Integer.toString(op), read.source().utf8ToString());
+                    assertEquals(Integer.toString(op), ((Translog.Index) read).source().utf8ToString());
                 } else {
                     Translog.Operation next = snapshot.next();
                     assertNull(next);
@@ -1718,7 +1718,7 @@ public class TranslogTests extends ESTestCase {
                     );
                     Translog.Operation next = snapshot.next();
                     assertNotNull("operation " + i + " must be non-null", next);
-                    assertEquals(i, Integer.parseInt(next.source().utf8ToString()));
+                    assertEquals(i, Integer.parseInt(((Translog.Index) next).source().utf8ToString()));
                 }
             }
         }
@@ -1773,7 +1773,7 @@ public class TranslogTests extends ESTestCase {
                 for (int i = 0; i < upTo; i++) {
                     Translog.Operation next = snapshot.next();
                     assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
-                    assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.source().utf8ToString()));
+                    assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(((Translog.Index) next).source().utf8ToString()));
                 }
             }
         }
@@ -1800,7 +1800,11 @@ public class TranslogTests extends ESTestCase {
                     for (int i = 0; i < upTo; i++) {
                         Translog.Operation next = snapshot.next();
                         assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
-                        assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.source().utf8ToString()));
+                        assertEquals(
+                            "payload mismatch, synced: " + sync,
+                            i,
+                            Integer.parseInt(((Translog.Index) next).source().utf8ToString())
+                        );
                     }
                 }
             }
@@ -1860,7 +1864,7 @@ public class TranslogTests extends ESTestCase {
                 for (int i = 0; i < upTo; i++) {
                     Translog.Operation next = snapshot.next();
                     assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
-                    assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.source().utf8ToString()));
+                    assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(((Translog.Index) next).source().utf8ToString()));
                 }
             }
         }
@@ -1888,7 +1892,11 @@ public class TranslogTests extends ESTestCase {
                     for (int i = 0; i < upTo; i++) {
                         Translog.Operation next = snapshot.next();
                         assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
-                        assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.source().utf8ToString()));
+                        assertEquals(
+                            "payload mismatch, synced: " + sync,
+                            i,
+                            Integer.parseInt(((Translog.Index) next).source().utf8ToString())
+                        );
                     }
                 }
             }
@@ -1972,7 +1980,7 @@ public class TranslogTests extends ESTestCase {
                 for (int i = 0; i < upTo; i++) {
                     Translog.Operation next = snapshot.next();
                     assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
-                    assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.source().utf8ToString()));
+                    assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(((Translog.Index) next).source().utf8ToString()));
                 }
             }
         }
@@ -2258,7 +2266,7 @@ public class TranslogTests extends ESTestCase {
             for (int i = firstUncommitted; i < translogOperations; i++) {
                 Translog.Operation next = snapshot.next();
                 assertNotNull("" + i, next);
-                assertEquals(Integer.parseInt(next.source().utf8ToString()), i);
+                assertEquals(Integer.parseInt(((Translog.Index) next).source().utf8ToString()), i);
             }
             assertNull(snapshot.next());
         }
@@ -2470,7 +2478,7 @@ public class TranslogTests extends ESTestCase {
                     );
                     Translog.Operation next = snapshot.next();
                     assertNotNull("operation " + i + " must be non-null", next);
-                    assertEquals(i, Integer.parseInt(next.source().utf8ToString()));
+                    assertEquals(i, Integer.parseInt(((Translog.Index) next).source().utf8ToString()));
                 }
             }
         }
@@ -2999,7 +3007,7 @@ public class TranslogTests extends ESTestCase {
 
             Translog.Operation op = snapshot.next();
             assertNotNull("operation 1 must be non-null", op);
-            assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.source().utf8ToString()));
+            assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(((Translog.Index) op).source().utf8ToString()));
 
             tlog.add(indexOp("" + 1, 1, primaryTerm.get(), "2"));
         }
@@ -3009,11 +3017,11 @@ public class TranslogTests extends ESTestCase {
 
             Translog.Operation secondOp = snapshot.next();
             assertNotNull("operation 2 must be non-null", secondOp);
-            assertEquals("payload mismatch for operation 2", Integer.parseInt(secondOp.source().utf8ToString()), 2);
+            assertEquals("payload mismatch for operation 2", Integer.parseInt(((Translog.Index) secondOp).source().utf8ToString()), 2);
 
             Translog.Operation firstOp = snapshot.next();
             assertNotNull("operation 1 must be non-null", firstOp);
-            assertEquals("payload mismatch for operation 1", Integer.parseInt(firstOp.source().utf8ToString()), 1);
+            assertEquals("payload mismatch for operation 1", Integer.parseInt(((Translog.Index) firstOp).source().utf8ToString()), 1);
         }
     }
 
@@ -3069,7 +3077,7 @@ public class TranslogTests extends ESTestCase {
                 for (int i = 0; i < 1; i++) {
                     Translog.Operation next = snapshot.next();
                     assertNotNull("operation " + i + " must be non-null", next);
-                    assertEquals("payload missmatch", i, Integer.parseInt(next.source().utf8ToString()));
+                    assertEquals("payload missmatch", i, Integer.parseInt(((Translog.Index) next).source().utf8ToString()));
                 }
             }
             tlog.add(indexOp("" + 1, 1, primaryTerm.get(), "1"));
@@ -3210,7 +3218,7 @@ public class TranslogTests extends ESTestCase {
                 assertEquals(syncedDocs.size(), snapshot.totalOperations());
                 for (int i = 0; i < syncedDocs.size(); i++) {
                     Translog.Operation next = snapshot.next();
-                    assertEquals(syncedDocs.get(i), next.source().utf8ToString());
+                    assertEquals(syncedDocs.get(i), ((Translog.Index) next).source().utf8ToString());
                     assertNotNull("operation " + i + " must be non-null", next);
                 }
             }
@@ -3359,7 +3367,7 @@ public class TranslogTests extends ESTestCase {
         );
         BytesStreamOutput out = new BytesStreamOutput();
         out.setTransportVersion(wireVersion);
-        Translog.Operation.writeOperation(out, index);
+        index.writeTo(out);
         StreamInput in = out.bytes().streamInput();
         in.setTransportVersion(wireVersion);
         Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in);
@@ -3382,7 +3390,7 @@ public class TranslogTests extends ESTestCase {
 
         out = new BytesStreamOutput();
         out.setTransportVersion(wireVersion);
-        Translog.Operation.writeOperation(out, delete);
+        delete.writeTo(out);
         in = out.bytes().streamInput();
         in.setTransportVersion(wireVersion);
         Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -1341,7 +1341,7 @@ public abstract class EngineTestCase extends ESTestCase {
             assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
             assertThat(luceneOp.opType(), equalTo(translogOp.opType()));
             if (luceneOp.opType() == Translog.Operation.Type.INDEX) {
-                assertThat(luceneOp.source(), equalTo(translogOp.source()));
+                assertThat(((Translog.Index) luceneOp).source(), equalTo(((Translog.Index) translogOp).source()));
             }
         }
     }

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

@@ -308,7 +308,7 @@ public class ShardChangesAction extends ActionType<ShardChangesAction.Response>
             out.writeZLong(globalCheckpoint);
             out.writeZLong(maxSeqNo);
             out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
-            out.writeArray(Translog.Operation::writeOperation, operations);
+            out.writeArray(operations);
             out.writeVLong(tookInMillis);
         }
 

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java

@@ -61,7 +61,7 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<Bul
         super.writeTo(out);
         out.writeString(historyUUID);
         out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
-        out.writeCollection(operations, Translog.Operation::writeOperation);
+        out.writeCollection(operations);
     }
 
     @Override