|
@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed;
|
|
|
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.Version;
|
|
|
+import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
@@ -75,10 +76,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertEquals("node_id", result.getExecutorNode());
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
|
|
|
+ new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
|
|
|
}
|
|
|
|
|
|
public void testSelectNode_GivenJobIsOpening() {
|
|
@@ -91,10 +101,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertEquals("node_id", result.getExecutorNode());
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
|
|
|
+ new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
|
|
|
}
|
|
|
|
|
|
public void testNoJobTask() {
|
|
@@ -107,15 +126,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertNull(result.getExecutorNode());
|
|
|
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " +
|
|
|
"[closed] while state [opened] is required"));
|
|
|
|
|
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
|
|
- () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
|
|
|
- .checkDatafeedTaskCanBeCreated());
|
|
|
+ () -> new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
|
|
|
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
|
|
+ "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]"));
|
|
|
}
|
|
@@ -131,15 +158,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertNull(result.getExecutorNode());
|
|
|
assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState +
|
|
|
"] while state [opened] is required", result.getExplanation());
|
|
|
|
|
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
|
|
- () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
|
|
|
- .checkDatafeedTaskCanBeCreated());
|
|
|
+ () -> new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
|
|
|
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
|
|
+ "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState
|
|
|
+ "] while state [opened] is required]"));
|
|
@@ -160,13 +195,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0, states);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertNull(result.getExecutorNode());
|
|
|
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
|
|
"does not have all primary shards active yet."));
|
|
|
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
|
|
|
+ new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
|
|
|
}
|
|
|
|
|
|
public void testShardNotAllActive() {
|
|
@@ -185,13 +229,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 2, 0, states);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertNull(result.getExecutorNode());
|
|
|
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
|
|
"does not have all primary shards active yet."));
|
|
|
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
|
|
|
+ new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
|
|
|
}
|
|
|
|
|
|
public void testIndexDoesntExist() {
|
|
@@ -204,17 +257,32 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertNull(result.getExecutorNode());
|
|
|
- assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " +
|
|
|
- "does not exist, is closed, or is still initializing."));
|
|
|
+ assertThat(result.getExplanation(),
|
|
|
+ equalTo("cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " +
|
|
|
+ "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " +
|
|
|
+ "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " +
|
|
|
+ "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]"));
|
|
|
|
|
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
|
|
- () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
|
|
|
- .checkDatafeedTaskCanBeCreated());
|
|
|
+ () -> new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
|
|
|
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
|
|
- + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]"));
|
|
|
+ + "[cannot start datafeed [datafeed_id] because it failed resolving " +
|
|
|
+ "indices given [not_foo] and indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, " +
|
|
|
+ "expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, " +
|
|
|
+ "allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] " +
|
|
|
+ "with exception [no such index [not_foo]]]"));
|
|
|
}
|
|
|
|
|
|
public void testRemoteIndex() {
|
|
@@ -227,8 +295,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertNotNull(result.getExecutorNode());
|
|
|
}
|
|
|
|
|
@@ -245,15 +317,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertNull(result.getExecutorNode());
|
|
|
assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale",
|
|
|
result.getExplanation());
|
|
|
|
|
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
|
|
- () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
|
|
|
- .checkDatafeedTaskCanBeCreated());
|
|
|
+ () -> new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
|
|
|
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
|
|
+ "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]"));
|
|
|
|
|
@@ -261,9 +341,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder);
|
|
|
tasks = tasksBuilder.build();
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
- result = new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertEquals("node_id1", result.getExecutorNode());
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
|
|
|
+ new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
|
|
|
}
|
|
|
|
|
|
public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() {
|
|
@@ -280,10 +370,17 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
|
|
- () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
|
|
|
- .checkDatafeedTaskCanBeCreated());
|
|
|
+ () -> new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
|
|
|
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
|
|
- + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]"));
|
|
|
+ + "[cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " +
|
|
|
+ "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " +
|
|
|
+ "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " +
|
|
|
+ "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]]"));
|
|
|
}
|
|
|
|
|
|
public void testSelectNode_GivenMlUpgradeMode() {
|
|
@@ -297,8 +394,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
- PersistentTasksCustomMetaData.Assignment result =
|
|
|
- new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
|
|
|
+ PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
|
|
assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE));
|
|
|
}
|
|
|
|
|
@@ -314,8 +415,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|
|
givenClusterState("foo", 1, 0);
|
|
|
|
|
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
|
|
- () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
|
|
|
- .checkDatafeedTaskCanBeCreated());
|
|
|
+ () -> new DatafeedNodeSelector(clusterState,
|
|
|
+ resolver,
|
|
|
+ df.getId(),
|
|
|
+ df.getJobId(),
|
|
|
+ df.getIndices(),
|
|
|
+ SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
|
|
|
assertThat(e.getMessage(), equalTo("Could not start datafeed [datafeed_id] as indices are being upgraded"));
|
|
|
}
|
|
|
|