Browse Source

[ML] Resolve duplicate key exception in GetDatafeedRunningStateAction (#125477)

조혜온 6 months ago
parent
commit
89adec154c

+ 6 - 0
docs/changelog/125477.yaml

@@ -0,0 +1,6 @@
+pr: 125477
+summary: Prevent get datafeeds stats API returning an error when local tasks are slow to stop
+area: Machine Learning
+type: bug
+issues:
+  - 104160

+ 17 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java

@@ -146,12 +146,28 @@ public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunning
 
         private final Map<String, RunningState> datafeedRunningState;
 
+        private static RunningState selectMostRecentState(RunningState state1, RunningState state2) {
+
+            if (state1.searchInterval != null && state2.searchInterval != null) {
+                return state1.searchInterval.startMs() > state2.searchInterval.startMs() ? state1 : state2;
+            }
+
+            if (state1.searchInterval != null) {
+                return state1;
+            }
+            if (state2.searchInterval != null) {
+                return state2;
+            }
+
+            return state2;
+        }
+
         public static Response fromResponses(List<Response> responses) {
             return new Response(
                 responses.stream()
                     .flatMap(r -> r.datafeedRunningState.entrySet().stream())
                     .filter(entry -> entry.getValue() != null)
-                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Response::selectMostRecentState))
             );
         }
 

+ 66 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java

@@ -9,6 +9,8 @@ package org.elasticsearch.xpack.core.ml.action;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response;
+import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response.RunningState;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests;
 
 import java.util.function.Function;
@@ -41,4 +43,68 @@ public class GetDatafeedRunningStateActionResponseTests extends AbstractWireSeri
         return Response::new;
     }
 
+    /**
+     * Tests merging responses with the same datafeed ID but different running states,
+     * where both states have a searchInterval with different start times.
+     * The state with the more recent searchInterval (larger startMs value) should be selected.
+     */
+    public void testMergeWithDuplicateKeysAndDifferentSearchIntervals() {
+        SearchInterval olderInterval = new SearchInterval(1000L, 2000L);
+        SearchInterval newerInterval = new SearchInterval(3000L, 4000L);
+
+        RunningState olderState = new RunningState(true, true, olderInterval);
+        RunningState newerState = new RunningState(false, false, newerInterval);
+
+        String datafeedId = "test-datafeed";
+        Response response1 = Response.fromTaskAndState(datafeedId, olderState);
+        Response response2 = Response.fromTaskAndState(datafeedId, newerState);
+
+        Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));
+
+        assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));
+
+        mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));
+        assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));
+    }
+
+    /**
+     * Tests merging responses with the same datafeed ID but different running states,
+     * where only one state has a searchInterval.
+     * The state with the searchInterval should be selected, regardless of order.
+     */
+    public void testMergeWithDuplicateKeysWhenOnlyOneHasSearchInterval() {
+        SearchInterval interval = new SearchInterval(1000L, 2000L);
+
+        RunningState stateWithInterval = new RunningState(true, true, interval);
+        RunningState stateWithoutInterval = new RunningState(false, false, null);
+
+        String datafeedId = "test-datafeed";
+        Response response1 = Response.fromTaskAndState(datafeedId, stateWithInterval);
+        Response response2 = Response.fromTaskAndState(datafeedId, stateWithoutInterval);
+
+        Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));
+
+        assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));
+
+        mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));
+        assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));
+    }
+
+    /**
+     * Tests merging responses with the same datafeed ID but different running states,
+     * where neither state has a searchInterval.
+     * In this case, the second state in the list should be selected.
+     */
+    public void testMergeWithDuplicateKeysWhenNeitherHasSearchInterval() {
+        RunningState state1 = new RunningState(true, true, null);
+        RunningState state2 = new RunningState(false, false, null);
+
+        String datafeedId = "test-datafeed";
+        Response response1 = Response.fromTaskAndState(datafeedId, state1);
+        Response response2 = Response.fromTaskAndState(datafeedId, state2);
+
+        Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));
+
+        assertEquals(state2, mergedResponse.getRunningState(datafeedId).orElse(null));
+    }
 }