Sfoglia il codice sorgente

Replace stream().map().iterator() with Iterators#map (#97964)

This is a common pattern, particularly in chunked XContent encoding, but
working with streams is surprisingly expensive. This commit replaces it
with a much simpler utility that works directly on iterators.

It also simplifies the cases where we avoid creating a stream by using
`Iterators.flatMap(..., x -> Iterators.single(...))`.
David Turner 2 anni fa
parent
commit
f12070e9e9
35 ha cambiato i file con 184 aggiunte e 169 eliminazioni
  1. 5 9
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/ExplainDataStreamLifecycleAction.java
  2. 12 16
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamLifecycleAction.java
  3. 6 6
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java
  4. 24 27
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java
  5. 5 6
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java
  6. 2 2
      server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java
  7. 2 2
      server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexResponse.java
  8. 2 2
      server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java
  9. 2 1
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java
  10. 2 2
      server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java
  11. 9 12
      server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java
  12. 2 2
      server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java
  13. 1 1
      server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java
  14. 16 23
      server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
  15. 2 2
      server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java
  16. 2 2
      server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java
  17. 2 2
      server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java
  18. 1 1
      server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
  19. 2 1
      server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java
  20. 4 10
      server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
  21. 2 3
      server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java
  22. 28 0
      server/src/main/java/org/elasticsearch/common/collect/Iterators.java
  23. 1 1
      server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContentHelper.java
  24. 3 3
      server/src/main/java/org/elasticsearch/health/Diagnosis.java
  25. 5 8
      server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
  26. 3 3
      server/src/main/java/org/elasticsearch/ingest/IngestStats.java
  27. 2 2
      server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java
  28. 3 2
      server/src/main/java/org/elasticsearch/script/ScriptMetadata.java
  29. 2 1
      server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java
  30. 3 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java
  31. 2 2
      server/src/main/java/org/elasticsearch/transport/TransportStats.java
  32. 10 0
      server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java
  33. 3 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/GetLifecycleAction.java
  34. 9 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java
  35. 5 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentMetadata.java

+ 5 - 9
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/ExplainDataStreamLifecycleAction.java

@@ -183,19 +183,15 @@ public class ExplainDataStreamLifecycleAction extends ActionType<ExplainDataStre
 
         @Override
         public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
-            final Iterator<? extends ToXContent> indicesIterator = indices.stream()
-                .map(explainIndexDataLifecycle -> (ToXContent) (builder, params) -> {
-                    builder.field(explainIndexDataLifecycle.getIndex());
-                    explainIndexDataLifecycle.toXContent(builder, params, rolloverConfiguration);
-                    return builder;
-                })
-                .iterator();
-
             return Iterators.concat(Iterators.single((builder, params) -> {
                 builder.startObject();
                 builder.startObject(INDICES_FIELD.getPreferredName());
                 return builder;
-            }), indicesIterator, Iterators.single((ToXContent) (builder, params) -> {
+            }), Iterators.map(indices.iterator(), explainIndexDataLifecycle -> (builder, params) -> {
+                builder.field(explainIndexDataLifecycle.getIndex());
+                explainIndexDataLifecycle.toXContent(builder, params, rolloverConfiguration);
+                return builder;
+            }), Iterators.single((builder, params) -> {
                 builder.endObject();
                 builder.endObject();
                 return builder;

+ 12 - 16
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamLifecycleAction.java

@@ -210,26 +210,22 @@ public class GetDataStreamLifecycleAction extends ActionType<GetDataStreamLifecy
         }
 
         @Override
-        public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
-            final Iterator<? extends ToXContent> lifecyclesIterator = dataStreamLifecycles.stream()
-                .map(
-                    dataStreamLifecycle -> (ToXContent) (builder, params) -> dataStreamLifecycle.toXContent(
-                        builder,
-                        params,
-                        rolloverConfiguration
-                    )
-                )
-                .iterator();
-
+        public Iterator<ToXContent> toXContentChunked(ToXContent.Params outerParams) {
             return Iterators.concat(Iterators.single((builder, params) -> {
                 builder.startObject();
                 builder.startArray(DATA_STREAMS_FIELD.getPreferredName());
                 return builder;
-            }), lifecyclesIterator, Iterators.single((ToXContent) (builder, params) -> {
-                builder.endArray();
-                builder.endObject();
-                return builder;
-            }));
+            }),
+                Iterators.map(
+                    dataStreamLifecycles.iterator(),
+                    dataStreamLifecycle -> (builder, params) -> dataStreamLifecycle.toXContent(builder, params, rolloverConfiguration)
+                ),
+                Iterators.single((builder, params) -> {
+                    builder.endArray();
+                    builder.endObject();
+                    return builder;
+                })
+            );
         }
 
         @Override

+ 6 - 6
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java

@@ -192,12 +192,12 @@ public class ListTasksResponse extends BaseTasksResponse {
                     }
                     builder.startObject(TASKS);
                     return builder;
-                }), Iterators.flatMap(entry.getValue().iterator(), task -> Iterators.<ToXContent>single((builder, params) -> {
+                }), Iterators.map(entry.getValue().iterator(), task -> (builder, params) -> {
                     builder.startObject(task.taskId().toString());
                     task.toXContent(builder, params);
                     builder.endObject();
                     return builder;
-                })), Iterators.<ToXContent>single((builder, params) -> {
+                }), Iterators.<ToXContent>single((builder, params) -> {
                     builder.endObject();
                     builder.endObject();
                     return builder;
@@ -219,11 +219,11 @@ public class ListTasksResponse extends BaseTasksResponse {
             toXContentCommon(builder, params);
             builder.startObject(TASKS);
             return builder;
-        }), getTaskGroups().stream().<ToXContent>map(group -> (builder, params) -> {
+        }), Iterators.map(getTaskGroups().iterator(), group -> (builder, params) -> {
             builder.field(group.taskInfo().taskId().toString());
             group.toXContent(builder, params);
             return builder;
-        }).iterator(), Iterators.single((builder, params) -> {
+        }), Iterators.single((builder, params) -> {
             builder.endObject();
             builder.endObject();
             return builder;
@@ -239,12 +239,12 @@ public class ListTasksResponse extends BaseTasksResponse {
             toXContentCommon(builder, params);
             builder.startArray(TASKS);
             return builder;
-        }), getTasks().stream().<ToXContent>map(taskInfo -> (builder, params) -> {
+        }), Iterators.map(getTasks().iterator(), taskInfo -> (builder, params) -> {
             builder.startObject();
             taskInfo.toXContent(builder, params);
             builder.endObject();
             return builder;
-        }).iterator(), Iterators.single((builder, params) -> {
+        }), Iterators.single((builder, params) -> {
             builder.endArray();
             builder.endObject();
             return builder;

+ 24 - 27
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java

@@ -170,35 +170,32 @@ public class GetSnapshotsResponse extends ActionResponse implements ChunkedToXCo
             b.startObject();
             b.startArray("snapshots");
             return b;
-        }),
-            getSnapshots().stream().map(snapshotInfo -> (ToXContent) snapshotInfo::toXContentExternal).iterator(),
-            Iterators.single((b, p) -> {
-                b.endArray();
-                if (failures.isEmpty() == false) {
-                    b.startObject("failures");
-                    for (Map.Entry<String, ElasticsearchException> error : failures.entrySet()) {
-                        b.field(error.getKey(), (bb, pa) -> {
-                            bb.startObject();
-                            error.getValue().toXContent(bb, pa);
-                            bb.endObject();
-                            return bb;
-                        });
-                    }
-                    b.endObject();
-                }
-                if (next != null) {
-                    b.field("next", next);
-                }
-                if (total >= 0) {
-                    b.field("total", total);
-                }
-                if (remaining >= 0) {
-                    b.field("remaining", remaining);
+        }), Iterators.map(getSnapshots().iterator(), snapshotInfo -> snapshotInfo::toXContentExternal), Iterators.single((b, p) -> {
+            b.endArray();
+            if (failures.isEmpty() == false) {
+                b.startObject("failures");
+                for (Map.Entry<String, ElasticsearchException> error : failures.entrySet()) {
+                    b.field(error.getKey(), (bb, pa) -> {
+                        bb.startObject();
+                        error.getValue().toXContent(bb, pa);
+                        bb.endObject();
+                        return bb;
+                    });
                 }
                 b.endObject();
-                return b;
-            })
-        );
+            }
+            if (next != null) {
+                b.field("next", next);
+            }
+            if (total >= 0) {
+                b.field("total", total);
+            }
+            if (remaining >= 0) {
+                b.field("remaining", remaining);
+            }
+            b.endObject();
+            return b;
+        }));
     }
 
     public static GetSnapshotsResponse fromXContent(XContentParser parser) throws IOException {

+ 5 - 6
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java

@@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.repositories.IndexSnapshotsService;
@@ -135,12 +136,10 @@ public class TransportGetShardSnapshotAction extends TransportMasterNodeAction<G
 
     private static Iterator<String> getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) {
         if (request.getFromAllRepositories()) {
-            return state.metadata()
-                .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY)
-                .repositories()
-                .stream()
-                .map(RepositoryMetadata::name)
-                .iterator();
+            return Iterators.map(
+                state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY).repositories().iterator(),
+                RepositoryMetadata::name
+            );
         } else {
             return request.getRepositories().iterator();
         }

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java

@@ -60,7 +60,7 @@ public class PendingClusterTasksResponse extends ActionResponse implements Chunk
             builder.startObject();
             builder.startArray(Fields.TASKS);
             return builder;
-        }), pendingTasks.stream().<ToXContent>map(pendingClusterTask -> (builder, p) -> {
+        }), Iterators.map(pendingTasks.iterator(), pendingClusterTask -> (builder, p) -> {
             builder.startObject();
             builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
             builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
@@ -70,7 +70,7 @@ public class PendingClusterTasksResponse extends ActionResponse implements Chunk
             builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
             builder.endObject();
             return builder;
-        }).iterator(), Iterators.single((builder, p) -> {
+        }), Iterators.single((builder, p) -> {
             builder.endArray();
             builder.endObject();
             return builder;

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexResponse.java

@@ -183,7 +183,7 @@ public class GetIndexResponse extends ActionResponse implements ChunkedToXConten
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
         return Iterators.concat(
             Iterators.single((builder, params) -> builder.startObject()),
-            Arrays.stream(indices).<ToXContent>map(index -> (builder, params) -> {
+            Iterators.map(Iterators.forArray(indices), index -> (builder, params) -> {
                 builder.startObject(index);
 
                 builder.startObject("aliases");
@@ -229,7 +229,7 @@ public class GetIndexResponse extends ActionResponse implements ChunkedToXConten
                 }
 
                 return builder.endObject();
-            }).iterator(),
+            }),
             Iterators.single((builder, params) -> builder.endObject())
         );
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java

@@ -70,7 +70,7 @@ public class GetMappingsResponse extends ActionResponse implements ChunkedToXCon
     public Iterator<ToXContent> toXContentChunked(ToXContent.Params outerParams) {
         return Iterators.concat(
             Iterators.single((b, p) -> b.startObject()),
-            getMappings().entrySet().stream().map(indexEntry -> (ToXContent) (builder, params) -> {
+            Iterators.map(getMappings().entrySet().iterator(), indexEntry -> (builder, params) -> {
                 builder.startObject(indexEntry.getKey());
                 boolean includeTypeName = params.paramAsBoolean(INCLUDE_TYPE_NAME_PARAMETER, DEFAULT_INCLUDE_TYPE_NAME_POLICY);
                 if (builder.getRestApiVersion() == RestApiVersion.V_7 && includeTypeName && indexEntry.getValue() != null) {
@@ -88,7 +88,7 @@ public class GetMappingsResponse extends ActionResponse implements ChunkedToXCon
                 }
                 builder.endObject();
                 return builder;
-            }).iterator(),
+            }),
             Iterators.single((b, p) -> b.endObject())
         );
     }

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -37,6 +37,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.Nullable;
@@ -272,7 +273,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
             if (state != batchExecutionContext.initialState()) {
                 var reason = new StringBuilder();
                 Strings.collectionToDelimitedStringWithLimit(
-                    (Iterable<String>) () -> results.stream().map(t -> t.sourceIndexName() + "->" + t.rolloverIndexName()).iterator(),
+                    (Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
                     ",",
                     "bulk rollover [",
                     "]",

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java

@@ -80,7 +80,7 @@ public class IndicesSegmentResponse extends ChunkedBroadcastResponse {
     protected Iterator<ToXContent> customXContentChunks(ToXContent.Params params) {
         return Iterators.concat(
             Iterators.single((builder, p) -> builder.startObject(Fields.INDICES)),
-            getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, p) -> {
+            Iterators.map(getIndices().values().iterator(), indexSegments -> (builder, p) -> {
                 builder.startObject(indexSegments.getIndex());
 
                 builder.startObject(Fields.SHARDS);
@@ -140,7 +140,7 @@ public class IndicesSegmentResponse extends ChunkedBroadcastResponse {
 
                 builder.endObject();
                 return builder;
-            }).iterator(),
+            }),
             Iterators.single((builder, p) -> builder.endObject())
         );
     }

+ 9 - 12
server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java

@@ -295,18 +295,15 @@ public class IndicesShardStoresResponse extends ActionResponse implements Chunke
                 indexShards -> Iterators.concat(
                     ChunkedToXContentHelper.startObject(indexShards.getKey()),
                     ChunkedToXContentHelper.startObject(Fields.SHARDS),
-                    Iterators.flatMap(
-                        indexShards.getValue().entrySet().iterator(),
-                        shardStatusesEntry -> Iterators.single((ToXContent) (builder, params) -> {
-                            builder.startObject(String.valueOf(shardStatusesEntry.getKey())).startArray(Fields.STORES);
-                            for (StoreStatus storeStatus : shardStatusesEntry.getValue()) {
-                                builder.startObject();
-                                storeStatus.toXContent(builder, params);
-                                builder.endObject();
-                            }
-                            return builder.endArray().endObject();
-                        })
-                    ),
+                    Iterators.map(indexShards.getValue().entrySet().iterator(), shardStatusesEntry -> (builder, params) -> {
+                        builder.startObject(String.valueOf(shardStatusesEntry.getKey())).startArray(Fields.STORES);
+                        for (StoreStatus storeStatus : shardStatusesEntry.getValue()) {
+                            builder.startObject();
+                            storeStatus.toXContent(builder, params);
+                            builder.endObject();
+                        }
+                        return builder.endArray().endObject();
+                    }),
                     ChunkedToXContentHelper.endObject(),
                     ChunkedToXContentHelper.endObject()
                 )

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java

@@ -184,7 +184,7 @@ public class IndicesStatsResponse extends ChunkedBroadcastResponse {
             return Iterators.concat(Iterators.single(((builder, p) -> {
                 commonStats(builder, p);
                 return builder.startObject(Fields.INDICES);
-            })), getIndices().values().stream().<ToXContent>map(indexStats -> (builder, p) -> {
+            })), Iterators.map(getIndices().values().iterator(), indexStats -> (builder, p) -> {
                 builder.startObject(indexStats.getIndex());
                 builder.field("uuid", indexStats.getUuid());
                 if (indexStats.getHealth() != null) {
@@ -215,7 +215,7 @@ public class IndicesStatsResponse extends ChunkedBroadcastResponse {
                     builder.endObject();
                 }
                 return builder.endObject();
-            }).iterator(), Iterators.single((b, p) -> b.endObject()));
+            }), Iterators.single((b, p) -> b.endObject()));
         }
         return Iterators.single((b, p) -> {
             commonStats(b, p);

+ 1 - 1
server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java

@@ -161,7 +161,7 @@ public class FieldCapabilitiesResponse extends ActionResponse implements Chunked
             Iterators.single(
                 (b, p) -> b.startObject().array(INDICES_FIELD.getPreferredName(), indices).startObject(FIELDS_FIELD.getPreferredName())
             ),
-            responseMap.entrySet().stream().map(r -> (ToXContent) (b, p) -> b.xContentValuesMap(r.getKey(), r.getValue())).iterator(),
+            Iterators.map(responseMap.entrySet().iterator(), r -> (b, p) -> b.xContentValuesMap(r.getKey(), r.getValue())),
             this.failures.size() > 0
                 ? Iterators.concat(
                     Iterators.single(

+ 16 - 23
server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

@@ -140,7 +140,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
 
     @Override
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
-        return Iterators.concat(startObject("nodes"), leastAvailableSpaceUsage.entrySet().stream().<ToXContent>map(c -> (builder, p) -> {
+        return Iterators.concat(startObject("nodes"), Iterators.map(leastAvailableSpaceUsage.entrySet().iterator(), c -> (builder, p) -> {
             builder.startObject(c.getKey());
             { // node
                 builder.field("node_name", c.getValue().getNodeName());
@@ -160,45 +160,38 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
             }
             builder.endObject(); // end $nodename
             return builder;
-        }).iterator(),
+        }),
             singleChunk(
                 (builder, p) -> builder.endObject(), // end "nodes"
                 (builder, p) -> builder.startObject("shard_sizes")
             ),
 
-            shardSizes.entrySet()
-                .stream()
-                .<ToXContent>map(
-                    c -> (builder, p) -> builder.humanReadableField(c.getKey() + "_bytes", c.getKey(), ByteSizeValue.ofBytes(c.getValue()))
-                )
-                .iterator(),
+            Iterators.map(
+                shardSizes.entrySet().iterator(),
+                c -> (builder, p) -> builder.humanReadableField(c.getKey() + "_bytes", c.getKey(), ByteSizeValue.ofBytes(c.getValue()))
+            ),
             singleChunk(
                 (builder, p) -> builder.endObject(), // end "shard_sizes"
                 (builder, p) -> builder.startObject("shard_data_set_sizes")
             ),
-            shardDataSetSizes.entrySet()
-                .stream()
-                .<ToXContent>map(
-                    c -> (builder, p) -> builder.humanReadableField(
-                        c.getKey() + "_bytes",
-                        c.getKey().toString(),
-                        ByteSizeValue.ofBytes(c.getValue())
-                    )
+            Iterators.map(
+                shardDataSetSizes.entrySet().iterator(),
+                c -> (builder, p) -> builder.humanReadableField(
+                    c.getKey() + "_bytes",
+                    c.getKey().toString(),
+                    ByteSizeValue.ofBytes(c.getValue())
                 )
-                .iterator(),
+            ),
             singleChunk(
                 (builder, p) -> builder.endObject(), // end "shard_data_set_sizes"
                 (builder, p) -> builder.startObject("shard_paths")
             ),
-            dataPath.entrySet()
-                .stream()
-                .<ToXContent>map(c -> (builder, p) -> builder.field(c.getKey().toString(), c.getValue()))
-                .iterator(),
+            Iterators.map(dataPath.entrySet().iterator(), c -> (builder, p) -> builder.field(c.getKey().toString(), c.getValue())),
             singleChunk(
                 (builder, p) -> builder.endObject(), // end "shard_paths"
                 (builder, p) -> builder.startArray("reserved_sizes")
             ),
-            reservedSpace.entrySet().stream().<ToXContent>map(c -> (builder, p) -> {
+            Iterators.map(reservedSpace.entrySet().iterator(), c -> (builder, p) -> {
                 builder.startObject();
                 {
                     builder.field("node_id", c.getKey().nodeId);
@@ -206,7 +199,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
                     c.getValue().toXContent(builder, params);
                 }
                 return builder.endObject(); // NodeAndPath
-            }).iterator(),
+            }),
 
             endArray() // end "reserved_sizes"
 

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java

@@ -67,12 +67,12 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
         return Iterators.concat(
             Iterators.single((builder, params) -> builder.startArray(TYPE)),
-            entries.stream().<ToXContent>map(entry -> (builder, params) -> {
+            Iterators.map(entries.iterator(), entry -> (builder, params) -> {
                 builder.startObject();
                 builder.field("repository", entry.repository);
                 builder.endObject();
                 return builder;
-            }).iterator(),
+            }),
             Iterators.single((builder, params) -> builder.endArray())
         );
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java

@@ -401,7 +401,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
         return Iterators.concat(
             Iterators.single((builder, params) -> builder.startArray("snapshots")),
-            entries.values().stream().<ToXContent>map(entry -> (builder, params) -> {
+            Iterators.map(entries.values().iterator(), entry -> (builder, params) -> {
                 builder.startObject();
                 builder.field("snapshot", entry.snapshot().getSnapshotId().getName());
                 builder.field("repository", entry.snapshot().getRepository());
@@ -431,7 +431,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
                 builder.endArray();
                 builder.endObject();
                 return builder;
-            }).iterator(),
+            }),
             Iterators.single((builder, params) -> builder.endArray())
         );
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java

@@ -165,7 +165,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
         return Iterators.concat(
             Iterators.single((builder, params) -> builder.startArray(TYPE)),
-            entries.stream().<ToXContent>map(entry -> (builder, params) -> {
+            Iterators.map(entries.iterator(), entry -> (builder, params) -> {
                 builder.startObject();
                 {
                     builder.field("repository", entry.repository());
@@ -180,7 +180,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
                 }
                 builder.endObject();
                 return builder;
-            }).iterator(),
+            }),
             Iterators.single((builder, params) -> builder.endArray())
         );
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -126,7 +126,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
     }
 
     public Iterable<List<Entry>> entriesByRepo() {
-        return () -> entries.values().stream().map(byRepo -> byRepo.entries).iterator();
+        return () -> Iterators.map(entries.values().iterator(), byRepo -> byRepo.entries);
     }
 
     public Stream<Entry> asStream() {

+ 2 - 1
server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.collect.Iterators;
 
 import java.util.List;
 import java.util.Objects;
@@ -66,7 +67,7 @@ public record JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaste
     }
 
     public Iterable<DiscoveryNode> nodes() {
-        return () -> nodeJoinTasks.stream().map(j -> j.node).iterator();
+        return () -> Iterators.map(nodeJoinTasks.iterator(), j -> j.node);
     }
 
     public JoinTask alsoRefreshState(ClusterState latestState) {

+ 4 - 10
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -1444,16 +1444,10 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, Ch
             persistentSettings,
             ChunkedToXContentHelper.wrapWithObject(
                 "templates",
-                templates().values()
-                    .stream()
-                    .map(
-                        template -> (ToXContent) (builder, params) -> IndexTemplateMetadata.Builder.toXContentWithTypes(
-                            template,
-                            builder,
-                            params
-                        )
-                    )
-                    .iterator()
+                Iterators.map(
+                    templates().values().iterator(),
+                    template -> (builder, params) -> IndexTemplateMetadata.Builder.toXContentWithTypes(template, builder, params)
+                )
             ),
             indices,
             Iterators.flatMap(

+ 2 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.AbstractNamedDiffable;
 import org.elasticsearch.cluster.NamedDiff;
 import org.elasticsearch.cluster.metadata.Metadata.Custom;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
@@ -253,9 +254,7 @@ public class RepositoriesMetadata extends AbstractNamedDiffable<Custom> implemen
 
     @Override
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
-        return repositories.stream()
-            .map(repository -> (ToXContent) (builder, params) -> toXContent(repository, builder, params))
-            .iterator();
+        return Iterators.map(repositories.iterator(), repository -> (builder, params) -> toXContent(repository, builder, params));
     }
 
     @Override

+ 28 - 0
server/src/main/java/org/elasticsearch/common/collect/Iterators.java

@@ -116,6 +116,34 @@ public class Iterators {
         }
     }
 
+    public static <T, U> Iterator<U> map(Iterator<? extends T> input, Function<T, U> fn) {
+        if (input.hasNext()) {
+            return new MapIterator<>(input, fn);
+        } else {
+            return Collections.emptyIterator();
+        }
+    }
+
+    private static final class MapIterator<T, U> implements Iterator<U> {
+        private final Iterator<? extends T> input;
+        private final Function<T, U> fn;
+
+        MapIterator(Iterator<? extends T> input, Function<T, U> fn) {
+            this.input = input;
+            this.fn = fn;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return input.hasNext();
+        }
+
+        @Override
+        public U next() {
+            return fn.apply(input.next());
+        }
+    }
+
     public static <T, U> Iterator<? extends U> flatMap(Iterator<? extends T> input, Function<T, Iterator<? extends U>> fn) {
         while (input.hasNext()) {
             final var value = fn.apply(input.next());

+ 1 - 1
server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContentHelper.java

@@ -79,7 +79,7 @@ public enum ChunkedToXContentHelper {
     }
 
     private static <T> Iterator<ToXContent> map(String name, Map<String, T> map, Function<Map.Entry<String, T>, ToXContent> toXContent) {
-        return wrapWithObject(name, map.entrySet().stream().map(toXContent).iterator());
+        return wrapWithObject(name, Iterators.map(map.entrySet().iterator(), toXContent));
     }
 
     public static Iterator<ToXContent> singleChunk(ToXContent... contents) {

+ 3 - 3
server/src/main/java/org/elasticsearch/health/Diagnosis.java

@@ -80,7 +80,7 @@ public record Diagnosis(Definition definition, @Nullable List<Resource> affected
         public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
             final Iterator<? extends ToXContent> valuesIterator;
             if (nodes != null) {
-                valuesIterator = nodes.stream().map(node -> (ToXContent) (builder, params) -> {
+                valuesIterator = Iterators.map(nodes.iterator(), node -> (builder, params) -> {
                     builder.startObject();
                     builder.field(ID_FIELD, node.getId());
                     if (node.getName() != null) {
@@ -88,9 +88,9 @@ public record Diagnosis(Definition definition, @Nullable List<Resource> affected
                     }
                     builder.endObject();
                     return builder;
-                }).iterator();
+                });
             } else {
-                valuesIterator = values.stream().map(value -> (ToXContent) (builder, params) -> builder.value(value)).iterator();
+                valuesIterator = Iterators.map(values.iterator(), value -> (builder, params) -> builder.value(value));
             }
             return ChunkedToXContentHelper.array(type.displayValue, valuesIterator);
         }

+ 5 - 8
server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java

@@ -235,14 +235,11 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent {
 
                 case INDICES -> Iterators.concat(
                     ChunkedToXContentHelper.startObject(Fields.INDICES),
-                    Iterators.flatMap(
-                        createCommonStatsByIndex().entrySet().iterator(),
-                        entry -> Iterators.<ToXContent>single((builder, params) -> {
-                            builder.startObject(entry.getKey().getName());
-                            entry.getValue().toXContent(builder, params);
-                            return builder.endObject();
-                        })
-                    ),
+                    Iterators.map(createCommonStatsByIndex().entrySet().iterator(), entry -> (builder, params) -> {
+                        builder.startObject(entry.getKey().getName());
+                        entry.getValue().toXContent(builder, params);
+                        return builder.endObject();
+                    }),
                     ChunkedToXContentHelper.endObject()
                 );
 

+ 3 - 3
server/src/main/java/org/elasticsearch/ingest/IngestStats.java

@@ -128,9 +128,9 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
                         return builder;
                     }),
 
-                    Iterators.flatMap(
+                    Iterators.map(
                         processorStats.getOrDefault(pipelineStat.pipelineId(), List.of()).iterator(),
-                        processorStat -> Iterators.<ToXContent>single((builder, params) -> {
+                        processorStat -> (builder, params) -> {
                             builder.startObject();
                             builder.startObject(processorStat.name());
                             builder.field("type", processorStat.type());
@@ -140,7 +140,7 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
                             builder.endObject();
                             builder.endObject();
                             return builder;
-                        })
+                        }
                     ),
 
                     Iterators.<ToXContent>single((builder, params) -> builder.endArray().endObject())

+ 2 - 2
server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java

@@ -76,14 +76,14 @@ public class RestTable {
             ChunkedRestResponseBody.fromXContent(
                 ignored -> Iterators.concat(
                     Iterators.single((builder, params) -> builder.startArray()),
-                    rowOrder.stream().<ToXContent>map(row -> (builder, params) -> {
+                    Iterators.map(rowOrder.iterator(), row -> (builder, params) -> {
                         builder.startObject();
                         for (DisplayHeader header : displayHeaders) {
                             builder.field(header.display, renderValue(request, table.getAsMap().get(header.name).get(row).value));
                         }
                         builder.endObject();
                         return builder;
-                    }).iterator(),
+                    }),
                     Iterators.single((builder, params) -> builder.endArray())
                 ),
                 ToXContent.EMPTY_PARAMS,

+ 3 - 2
server/src/main/java/org/elasticsearch/script/ScriptMetadata.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.DiffableUtils;
 import org.elasticsearch.cluster.NamedDiff;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.ParsingException;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -262,10 +263,10 @@ public final class ScriptMetadata implements Metadata.Custom, Writeable {
 
     @Override
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
-        return scripts.entrySet().stream().map(entry -> (ToXContent) (builder, params) -> {
+        return Iterators.map(scripts.entrySet().iterator(), entry -> (builder, params) -> {
             builder.field(entry.getKey());
             return entry.getValue().toXContent(builder, params);
-        }).iterator();
+        });
     }
 
     @Override

+ 2 - 1
server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java

@@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations;
 
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.common.ParsingException;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.xcontent.ToXContentFragment;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -49,7 +50,7 @@ public class Aggregations implements Iterable<Aggregation>, ToXContentFragment {
      */
     @Override
     public final Iterator<Aggregation> iterator() {
-        return aggregations.stream().map((p) -> (Aggregation) p).iterator();
+        return Iterators.map(aggregations.iterator(), p -> (Aggregation) p);
     }
 
     /**

+ 3 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java

@@ -16,6 +16,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.BytesRefHash;
 import org.elasticsearch.common.util.ObjectArray;
@@ -316,7 +317,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
                     Source source = sourceProvider.getSource(ctx, doc).filter(sourceFilter);
                     try {
                         for (String sourceField : sourceFieldNames) {
-                            Iterator<String> itr = extractRawValues(source, sourceField).stream().map(obj -> {
+                            Iterator<String> itr = Iterators.map(extractRawValues(source, sourceField).iterator(), obj -> {
                                 if (obj == null) {
                                     return null;
                                 }
@@ -324,7 +325,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
                                     return fieldType.valueForDisplay(obj).toString();
                                 }
                                 return obj.toString();
-                            }).iterator();
+                            });
                             while (itr.hasNext()) {
                                 String text = itr.next();
                                 TokenStream ts = analyzer.tokenStream(fieldType.name(), text);

+ 2 - 2
server/src/main/java/org/elasticsearch/transport/TransportStats.java

@@ -212,11 +212,11 @@ public class TransportStats implements Writeable, ChunkedToXContent {
                 return builder;
             }),
 
-            Iterators.flatMap(transportActionStats.entrySet().iterator(), entry -> Iterators.single((builder, params) -> {
+            Iterators.map(transportActionStats.entrySet().iterator(), entry -> (builder, params) -> {
                 builder.field(entry.getKey());
                 entry.getValue().toXContent(builder, params);
                 return builder;
-            })),
+            }),
 
             Iterators.single((builder, params) -> {
                 if (transportActionStats.isEmpty() == false) {

+ 10 - 0
server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java

@@ -183,6 +183,16 @@ public class IteratorsTests extends ESTestCase {
         assertEquals(expectedArray.length, index.get());
     }
 
+    public void testMap() {
+        assertEmptyIterator(Iterators.map(Iterators.concat(), i -> "foo"));
+
+        final var array = randomIntegerArray();
+        final var index = new AtomicInteger();
+        Iterators.map(Iterators.forArray(array), i -> i * 2)
+            .forEachRemaining(i -> assertEquals(array[index.getAndIncrement()] * 2, (long) i));
+        assertEquals(array.length, index.get());
+    }
+
     private static Integer[] randomIntegerArray() {
         return Randomness.get().ints(randomIntBetween(0, 1000)).boxed().toArray(Integer[]::new);
     }

+ 3 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/GetLifecycleAction.java

@@ -84,11 +84,10 @@ public class GetLifecycleAction extends ActionType<GetLifecycleAction.Response>
         }
 
         @Override
-        @SuppressWarnings("unchecked")
-        public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
+        public Iterator<ToXContent> toXContentChunked(ToXContent.Params outerParams) {
             return Iterators.concat(
                 Iterators.single((builder, params) -> builder.startObject()),
-                policies.stream().map(policy -> (ToXContent) (b, p) -> {
+                Iterators.map(policies.iterator(), policy -> (b, p) -> {
                     b.startObject(policy.getLifecyclePolicy().getName());
                     b.field("version", policy.getVersion());
                     b.field("modified_date", policy.getModifiedDate());
@@ -96,7 +95,7 @@ public class GetLifecycleAction extends ActionType<GetLifecycleAction.Response>
                     b.field("in_use_by", policy.getUsage());
                     b.endObject();
                     return b;
-                }).iterator(),
+                }),
                 Iterators.single((b, p) -> b.endObject())
             );
         }

+ 9 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java

@@ -11,6 +11,7 @@ import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.util.BytesRefHash;
 import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
@@ -163,12 +164,15 @@ public class CategorizeTextAggregator extends DeferableBucketAggregator {
 
             private void collectFromSource(int doc, long owningBucketOrd, TokenListCategorizer categorizer) throws IOException {
                 Source source = sourceProvider.getSource(aggCtx.getLeafReaderContext(), doc).filter(sourceFilter);
-                Iterator<String> itr = XContentMapValues.extractRawValues(sourceFieldName, source.source()).stream().map(obj -> {
-                    if (obj instanceof BytesRef) {
-                        return fieldType.valueForDisplay(obj).toString();
+                Iterator<String> itr = Iterators.map(
+                    XContentMapValues.extractRawValues(sourceFieldName, source.source()).iterator(),
+                    obj -> {
+                        if (obj instanceof BytesRef) {
+                            return fieldType.valueForDisplay(obj).toString();
+                        }
+                        return (obj == null) ? null : obj.toString();
                     }
-                    return (obj == null) ? null : obj.toString();
-                }).iterator();
+                );
                 while (itr.hasNext()) {
                     String string = itr.next();
                     try (TokenStream ts = analyzer.tokenStream(fieldType.name(), string)) {

+ 5 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentMetadata.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.NamedDiff;
 import org.elasticsearch.cluster.SimpleDiffable;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xcontent.ToXContent;
@@ -131,10 +132,10 @@ public class TrainedModelAssignmentMetadata implements Metadata.Custom {
 
     @Override
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
-        return deploymentRoutingEntries.entrySet()
-            .stream()
-            .map(entry -> (ToXContent) (builder, params) -> entry.getValue().toXContent(builder.field(entry.getKey()), params))
-            .iterator();
+        return Iterators.map(
+            deploymentRoutingEntries.entrySet().iterator(),
+            entry -> (builder, params) -> entry.getValue().toXContent(builder.field(entry.getKey()), params)
+        );
     }
 
     @Override