Browse Source

remove usages of #readOptionalStreamable, #readStreamableList. (#44578)

This commit removes references to Streamable from StreamInput.

This is all a part of the effort to remove Streamable usage.

relates #34389.
Tal Levy 6 years ago
parent
commit
bbe97b03a5

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreSnapshotResponse.java

@@ -69,7 +69,7 @@ public class RestoreSnapshotResponse extends ActionResponse implements ToXConten
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeOptionalStreamable(restoreInfo);
+        out.writeOptionalWriteable(restoreInfo);
     }
 
     public RestStatus status() {

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java

@@ -134,7 +134,7 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment {
     public void writeTo(StreamOutput out) throws IOException {
         shardRouting.writeTo(out);
         commonStats.writeTo(out);
-        out.writeOptionalStreamable(commitStats);
+        out.writeOptionalWriteable(commitStats);
         out.writeString(statePath);
         out.writeString(dataPath);
         out.writeBoolean(isCustomDataPath);

+ 0 - 38
server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

@@ -75,7 +75,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.IntFunction;
-import java.util.function.Supplier;
 
 import static org.elasticsearch.ElasticsearchException.readStackTrace;
 
@@ -825,20 +824,6 @@ public abstract class StreamInput extends InputStream {
         return readBoolean() ? readArray(reader, arraySupplier) : null;
     }
 
-    /**
-     * Serializes a potential null value.
-     */
-    @Nullable
-    public <T extends Streamable> T readOptionalStreamable(Supplier<T> supplier) throws IOException {
-        if (readBoolean()) {
-            T streamable = supplier.get();
-            streamable.readFrom(this);
-            return streamable;
-        } else {
-            return null;
-        }
-    }
-
     @Nullable
     public <T extends Writeable> T readOptionalWriteable(Writeable.Reader<T> reader) throws IOException {
         if (readBoolean()) {
@@ -991,29 +976,6 @@ public abstract class StreamInput extends InputStream {
         return null;
     }
 
-    /**
-     * Read a {@link List} of {@link Streamable} objects, using the {@code constructor} to instantiate each instance.
-     * <p>
-     * This is expected to take the form:
-     * <code>
-     * List&lt;MyStreamableClass&gt; list = in.readStreamList(MyStreamableClass::new);
-     * </code>
-     *
-     * @param constructor Streamable instance creator
-     * @return Never {@code null}.
-     * @throws IOException if any step fails
-     */
-    public <T extends Streamable> List<T> readStreamableList(Supplier<T> constructor) throws IOException {
-        int count = readArraySize();
-        List<T> builder = new ArrayList<>(count);
-        for (int i=0; i<count; i++) {
-            T instance = constructor.get();
-            instance.readFrom(this);
-            builder.add(instance);
-        }
-        return builder;
-    }
-
     /**
      * Reads a list of objects. The list is expected to have been written using {@link StreamOutput#writeList(List)} or
      * {@link StreamOutput#writeStreamableList(List)}.

+ 17 - 21
server/src/main/java/org/elasticsearch/index/engine/CommitStats.java

@@ -21,7 +21,7 @@ package org.elasticsearch.index.engine;
 import org.apache.lucene.index.SegmentInfos;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
@@ -35,12 +35,12 @@ import java.util.Map;
 import static java.util.Map.entry;
 
 /** a class the returns dynamic information with respect to the last commit point of this shard */
-public final class CommitStats implements Streamable, ToXContentFragment {
+public final class CommitStats implements Writeable, ToXContentFragment {
 
-    private Map<String, String> userData;
-    private long generation;
-    private String id; // lucene commit id in base 64;
-    private int numDocs;
+    private final Map<String, String> userData;
+    private final long generation;
+    private final String id; // lucene commit id in base 64;
+    private final int numDocs;
 
     public CommitStats(SegmentInfos segmentInfos) {
         // clone the map to protect against concurrent changes
@@ -51,11 +51,20 @@ public final class CommitStats implements Streamable, ToXContentFragment {
         numDocs = Lucene.getNumDocs(segmentInfos);
     }
 
-    private CommitStats() {
+    CommitStats(StreamInput in) throws IOException {
+        final int length = in.readVInt();
+        final var entries = new ArrayList<Map.Entry<String, String>>(length);
+        for (int i = length; i > 0; i--) {
+            entries.add(entry(in.readString(), in.readString()));
+        }
+        userData = Maps.ofEntries(entries);
+        generation = in.readLong();
+        id = in.readOptionalString();
+        numDocs = in.readInt();
     }
 
     public static CommitStats readOptionalCommitStatsFrom(StreamInput in) throws IOException {
-        return in.readOptionalStreamable(CommitStats::new);
+        return in.readOptionalWriteable(CommitStats::new);
     }
 
 
@@ -93,19 +102,6 @@ public final class CommitStats implements Streamable, ToXContentFragment {
         return numDocs;
     }
 
-    @Override
-    public void readFrom(StreamInput in) throws IOException {
-        final int length = in.readVInt();
-        final var entries = new ArrayList<Map.Entry<String, String>>(length);
-        for (int i = length; i > 0; i--) {
-            entries.add(entry(in.readString(), in.readString()));
-        }
-        userData = Maps.ofEntries(entries);
-        generation = in.readLong();
-        id = in.readOptionalString();
-        numDocs = in.readInt();
-    }
-
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeVInt(userData.size());

+ 15 - 16
server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java

@@ -22,7 +22,7 @@ import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -39,7 +39,7 @@ import java.util.Objects;
  * <p>
  * Returned as part of {@link org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse}
  */
-public class RestoreInfo implements ToXContentObject, Streamable {
+public class RestoreInfo implements ToXContentObject, Writeable {
 
     private String name;
 
@@ -59,6 +59,18 @@ public class RestoreInfo implements ToXContentObject, Streamable {
         this.successfulShards = successfulShards;
     }
 
+    public RestoreInfo(StreamInput in) throws IOException {
+        name = in.readString();
+        int size = in.readVInt();
+        List<String> indicesListBuilder = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            indicesListBuilder.add(in.readString());
+        }
+        indices = Collections.unmodifiableList(indicesListBuilder);
+        totalShards = in.readVInt();
+        successfulShards = in.readVInt();
+    }
+
     /**
      * Snapshot name
      *
@@ -149,19 +161,6 @@ public class RestoreInfo implements ToXContentObject, Streamable {
         return PARSER.parse(parser, null);
     }
 
-    @Override
-    public void readFrom(StreamInput in) throws IOException {
-        name = in.readString();
-        int size = in.readVInt();
-        List<String> indicesListBuilder = new ArrayList<>();
-        for (int i = 0; i < size; i++) {
-            indicesListBuilder.add(in.readString());
-        }
-        indices = Collections.unmodifiableList(indicesListBuilder);
-        totalShards = in.readVInt();
-        successfulShards = in.readVInt();
-    }
-
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(name);
@@ -180,7 +179,7 @@ public class RestoreInfo implements ToXContentObject, Streamable {
      * @return restore info
      */
     public static RestoreInfo readOptionalRestoreInfo(StreamInput in) throws IOException {
-        return in.readOptionalStreamable(RestoreInfo::new);
+        return in.readOptionalWriteable(RestoreInfo::new);
     }
 
     @Override

+ 7 - 10
server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java

@@ -432,18 +432,18 @@ public class BytesStreamsTests extends ESTestCase {
 
     public void testWriteStreamableList() throws IOException {
         final int size = randomIntBetween(0, 5);
-        final List<TestStreamable> expected = new ArrayList<>(size);
+        final List<TestWriteable> expected = new ArrayList<>(size);
 
         for (int i = 0; i < size; ++i) {
-            expected.add(new TestStreamable(randomBoolean()));
+            expected.add(new TestWriteable(randomBoolean()));
         }
 
         final BytesStreamOutput out = new BytesStreamOutput();
-        out.writeStreamableList(expected);
+        out.writeList(expected);
 
         final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
 
-        final List<TestStreamable> loaded = in.readStreamableList(TestStreamable::new);
+        final List<TestWriteable> loaded = in.readList(TestWriteable::new);
 
         assertThat(loaded, hasSize(expected.size()));
 
@@ -587,18 +587,15 @@ public class BytesStreamsTests extends ESTestCase {
         }
     }
 
-    private static class TestStreamable implements Streamable {
+    private static class TestWriteable implements Writeable {
 
         private boolean value;
 
-        TestStreamable() { }
-
-        TestStreamable(boolean value) {
+        TestWriteable(boolean value) {
             this.value = value;
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
+        TestWriteable(StreamInput in) throws IOException {
             value = in.readBoolean();
         }
 

+ 9 - 13
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/execution/QueuedWatch.java

@@ -7,7 +7,7 @@ package org.elasticsearch.xpack.core.watcher.execution;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
@@ -16,16 +16,13 @@ import java.time.Instant;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 
-public class QueuedWatch implements Streamable, ToXContentObject {
+public class QueuedWatch implements Writeable, ToXContentObject {
 
     private String watchId;
     private String watchRecordId;
     private ZonedDateTime triggeredTime;
     private ZonedDateTime executionTime;
 
-    public QueuedWatch() {
-    }
-
     public QueuedWatch(WatchExecutionContext ctx) {
         this.watchId = ctx.id().watchId();
         this.watchRecordId = ctx.id().value();
@@ -33,6 +30,13 @@ public class QueuedWatch implements Streamable, ToXContentObject {
         this.executionTime = ctx.executionTime();
     }
 
+    public QueuedWatch(StreamInput in) throws IOException {
+        watchId = in.readString();
+        watchRecordId = in.readString();
+        triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
+        executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
+    }
+
     public String watchId() {
         return watchId;
     }
@@ -53,14 +57,6 @@ public class QueuedWatch implements Streamable, ToXContentObject {
         this.executionTime = executionTime;
     }
 
-    @Override
-    public void readFrom(StreamInput in) throws IOException {
-        watchId = in.readString();
-        watchRecordId = in.readString();
-        triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
-        executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
-    }
-
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(watchId);

+ 25 - 29
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/execution/WatchExecutionSnapshot.java

@@ -7,7 +7,7 @@ package org.elasticsearch.xpack.core.watcher.execution;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
@@ -18,18 +18,15 @@ import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.Map;
 
-public class WatchExecutionSnapshot implements Streamable, ToXContentObject {
+public class WatchExecutionSnapshot implements Writeable, ToXContentObject {
 
-    private String watchId;
-    private String watchRecordId;
-    private ZonedDateTime triggeredTime;
-    private ZonedDateTime executionTime;
-    private ExecutionPhase phase;
+    private final String watchId;
+    private final String watchRecordId;
+    private final ZonedDateTime triggeredTime;
+    private final ZonedDateTime executionTime;
+    private final ExecutionPhase phase;
+    private final StackTraceElement[] executionStackTrace;
     private String[] executedActions;
-    private StackTraceElement[] executionStackTrace;
-
-    public WatchExecutionSnapshot() {
-    }
 
     public WatchExecutionSnapshot(WatchExecutionContext context, StackTraceElement[] executionStackTrace) {
         watchId = context.id().watchId();
@@ -48,6 +45,23 @@ public class WatchExecutionSnapshot implements Streamable, ToXContentObject {
         this.executionStackTrace = executionStackTrace;
     }
 
+    public WatchExecutionSnapshot(StreamInput in) throws IOException {
+        watchId = in.readString();
+        watchRecordId = in.readString();
+        triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
+        executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
+        phase = ExecutionPhase.resolve(in.readString());
+        int size = in.readVInt();
+        executionStackTrace = new StackTraceElement[size];
+        for (int i = 0; i < size; i++) {
+            String declaringClass = in.readString();
+            String methodName = in.readString();
+            String fileName = in.readOptionalString();
+            int lineNumber = in.readInt();
+            executionStackTrace[i] = new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
+        }
+    }
+
     public String watchId() {
         return watchId;
     }
@@ -72,24 +86,6 @@ public class WatchExecutionSnapshot implements Streamable, ToXContentObject {
         return executionStackTrace;
     }
 
-    @Override
-    public void readFrom(StreamInput in) throws IOException {
-        watchId = in.readString();
-        watchRecordId = in.readString();
-        triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
-        executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
-        phase = ExecutionPhase.resolve(in.readString());
-        int size = in.readVInt();
-        executionStackTrace = new StackTraceElement[size];
-        for (int i = 0; i < size; i++) {
-            String declaringClass = in.readString();
-            String methodName = in.readString();
-            String fileName = in.readOptionalString();
-            int lineNumber = in.readInt();
-            executionStackTrace[i] = new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
-        }
-    }
-
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(watchId);

+ 5 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java

@@ -100,10 +100,10 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
             watcherState = WatcherState.fromId(in.readByte());
 
             if (in.readBoolean()) {
-                snapshots = in.readStreamableList(WatchExecutionSnapshot::new);
+                snapshots = in.readList(WatchExecutionSnapshot::new);
             }
             if (in.readBoolean()) {
-                queuedWatches = in.readStreamableList(QueuedWatch::new);
+                queuedWatches = in.readList(QueuedWatch::new);
             }
             if (in.readBoolean()) {
                 stats = Counters.read(in);
@@ -194,11 +194,11 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
 
             out.writeBoolean(snapshots != null);
             if (snapshots != null) {
-                out.writeStreamableList(snapshots);
+                out.writeList(snapshots);
             }
             out.writeBoolean(queuedWatches != null);
             if (queuedWatches != null) {
-                out.writeStreamableList(queuedWatches);
+                out.writeList(queuedWatches);
             }
             out.writeBoolean(stats != null);
             if (stats != null) {
@@ -240,4 +240,4 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
             return builder;
         }
     }
-}
+}