瀏覽代碼

Removing the use of Stream::peek from GeoIpDownloader::cleanDatabases (#110666)

Keith Massey 1 年之前
父節點
當前提交
c37fa227bd

+ 5 - 0
docs/changelog/110666.yaml

@@ -0,0 +1,5 @@
+pr: 110666
+summary: Removing the use of Stream::peek from `GeoIpDownloader::cleanDatabases`
+area: Ingest Node
+type: bug
+issues: []

+ 10 - 13
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

@@ -318,22 +318,19 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
     }
 
     private void cleanDatabases() {
-        long expiredDatabases = state.getDatabases()
+        List<Map.Entry<String, Metadata>> expiredDatabases = state.getDatabases()
             .entrySet()
             .stream()
             .filter(e -> e.getValue().isValid(clusterService.state().metadata().settings()) == false)
-            .peek(e -> {
-                String name = e.getKey();
-                Metadata meta = e.getValue();
-                deleteOldChunks(name, meta.lastChunk() + 1);
-                state = state.put(
-                    name,
-                    new Metadata(meta.lastUpdate(), meta.firstChunk(), meta.lastChunk(), meta.md5(), meta.lastCheck() - 1)
-                );
-                updateTaskState();
-            })
-            .count();
-        stats = stats.expiredDatabases((int) expiredDatabases);
+            .toList();
+        expiredDatabases.forEach(e -> {
+            String name = e.getKey();
+            Metadata meta = e.getValue();
+            deleteOldChunks(name, meta.lastChunk() + 1);
+            state = state.put(name, new Metadata(meta.lastUpdate(), meta.firstChunk(), meta.lastChunk(), meta.md5(), meta.lastCheck() - 1));
+            updateTaskState();
+        });
+        stats = stats.expiredDatabases(expiredDatabases.size());
     }
 
     @Override

+ 1 - 2
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java

@@ -580,7 +580,6 @@ public class GeoIpDownloaderTests extends ESTestCase {
         client.addHandler(
             UpdatePersistentTaskStatusAction.INSTANCE,
             (UpdatePersistentTaskStatusAction.Request request, ActionListener<PersistentTaskResponse> taskResponseListener) -> {
-
                 PersistentTasksCustomMetadata.Assignment assignment = mock(PersistentTasksCustomMetadata.Assignment.class);
                 PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = new PersistentTasksCustomMetadata.PersistentTask<>(
                     GeoIpDownloader.GEOIP_DOWNLOADER,
@@ -589,8 +588,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
                     request.getAllocationId(),
                     assignment
                 );
-                taskResponseListener.onResponse(new PersistentTaskResponse(new PersistentTask<>(persistentTask, request.getState())));
                 updatePersistentTaskStateCount.incrementAndGet();
+                taskResponseListener.onResponse(new PersistentTaskResponse(new PersistentTask<>(persistentTask, request.getState())));
             }
         );
         client.addHandler(