|
|
@@ -14,8 +14,10 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
|
|
|
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
|
|
|
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter;
|
|
|
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
|
|
|
+import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
|
|
import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
|
|
|
import org.elasticsearch.xpack.ml.process.NativeController;
|
|
|
+import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
|
|
|
@@ -39,7 +41,10 @@ import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Matchers.eq;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
public class NativeAutodetectProcessTests extends ESTestCase {
|
|
|
@@ -47,25 +52,38 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|
|
private static final int NUMBER_FIELDS = 5;
|
|
|
|
|
|
private ExecutorService executorService;
|
|
|
+ private CppLogMessageHandler cppLogHandler;
|
|
|
+ private ByteArrayOutputStream inputStream;
|
|
|
+ private InputStream outputStream;
|
|
|
+ private OutputStream restoreStream;
|
|
|
+ private InputStream persistStream;
|
|
|
+ private ProcessPipes processPipes;
|
|
|
|
|
|
@Before
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void initialize() {
|
|
|
executorService = mock(ExecutorService.class);
|
|
|
when(executorService.submit(any(Runnable.class))).thenReturn(mock(Future.class));
|
|
|
+ cppLogHandler = mock(CppLogMessageHandler.class);
|
|
|
+ when(cppLogHandler.getErrors()).thenReturn("");
|
|
|
+ inputStream = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
|
|
|
+ outputStream = new ByteArrayInputStream("some string of data".getBytes(StandardCharsets.UTF_8));
|
|
|
+ restoreStream = mock(OutputStream.class);
|
|
|
+ persistStream = mock(InputStream.class);
|
|
|
+ processPipes = mock(ProcessPipes.class);
|
|
|
+ when(processPipes.getLogStreamHandler()).thenReturn(cppLogHandler);
|
|
|
+ when(processPipes.getProcessInStream()).thenReturn(Optional.of(inputStream));
|
|
|
+ when(processPipes.getProcessOutStream()).thenReturn(Optional.of(outputStream));
|
|
|
+ when(processPipes.getRestoreStream()).thenReturn(Optional.of(restoreStream));
|
|
|
+ when(processPipes.getPersistStream()).thenReturn(Optional.of(persistStream));
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testProcessStartTime() throws Exception {
|
|
|
- InputStream logStream = mock(InputStream.class);
|
|
|
- when(logStream.read(new byte[1024])).thenReturn(-1);
|
|
|
- InputStream outputStream = mock(InputStream.class);
|
|
|
- when(outputStream.read(new byte[512])).thenReturn(-1);
|
|
|
- try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
|
|
|
- mock(OutputStream.class), outputStream, mock(OutputStream.class),
|
|
|
- NUMBER_FIELDS, null,
|
|
|
+ try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
|
|
|
+ processPipes, NUMBER_FIELDS, null,
|
|
|
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
|
|
- process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
|
|
+ process.start(executorService, mock(IndexingStateProcessor.class));
|
|
|
|
|
|
ZonedDateTime startTime = process.getProcessStartTime();
|
|
|
Thread.sleep(500);
|
|
|
@@ -79,21 +97,16 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testWriteRecord() throws IOException {
|
|
|
- InputStream logStream = mock(InputStream.class);
|
|
|
- when(logStream.read(new byte[1024])).thenReturn(-1);
|
|
|
- InputStream outputStream = mock(InputStream.class);
|
|
|
- when(outputStream.read(new byte[512])).thenReturn(-1);
|
|
|
String[] record = {"r1", "r2", "r3", "r4", "r5"};
|
|
|
- ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
|
|
|
- try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
|
|
|
- bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
|
|
+ try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
|
|
|
+ processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
|
|
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
|
|
- process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
|
|
+ process.start(executorService, mock(IndexingStateProcessor.class));
|
|
|
|
|
|
process.writeRecord(record);
|
|
|
process.flushStream();
|
|
|
|
|
|
- ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
|
|
|
+ ByteBuffer bb = ByteBuffer.wrap(inputStream.toByteArray());
|
|
|
|
|
|
// read header
|
|
|
int numFields = bb.getInt();
|
|
|
@@ -115,20 +128,15 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testFlush() throws IOException {
|
|
|
- InputStream logStream = mock(InputStream.class);
|
|
|
- when(logStream.read(new byte[1024])).thenReturn(-1);
|
|
|
- InputStream outputStream = mock(InputStream.class);
|
|
|
- when(outputStream.read(new byte[512])).thenReturn(-1);
|
|
|
- ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
|
|
|
- try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
|
|
|
- bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
|
|
+ try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
|
|
|
+ processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
|
|
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
|
|
- process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
|
|
+ process.start(executorService, mock(IndexingStateProcessor.class));
|
|
|
|
|
|
FlushJobParams params = FlushJobParams.builder().build();
|
|
|
process.flushJob(params);
|
|
|
|
|
|
- ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
|
|
|
+ ByteBuffer bb = ByteBuffer.wrap(inputStream.toByteArray());
|
|
|
assertThat(bb.remaining(), is(greaterThan(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH)));
|
|
|
}
|
|
|
}
|
|
|
@@ -143,44 +151,52 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testPersistJob() throws IOException {
|
|
|
- testWriteMessage(p -> p.persistState(), AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
|
|
|
+ testWriteMessage(NativeAutodetectProcess::persistState, AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testConsumeAndCloseOutputStream() throws IOException {
|
|
|
- InputStream logStream = mock(InputStream.class);
|
|
|
- when(logStream.read(new byte[1024])).thenReturn(-1);
|
|
|
- OutputStream processInStream = mock(OutputStream.class);
|
|
|
- String json = "some string of data";
|
|
|
- ByteArrayInputStream processOutStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
|
|
|
-
|
|
|
- try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
|
|
|
- processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
|
|
- new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
|
|
|
+
|
|
|
+ try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
|
|
|
+ processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
|
|
+ new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
|
|
|
Duration.ZERO)) {
|
|
|
|
|
|
+ process.start(executorService);
|
|
|
process.consumeAndCloseOutputStream();
|
|
|
- assertThat(processOutStream.available(), equalTo(0));
|
|
|
+ assertThat(outputStream.available(), equalTo(0));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public void testPipeConnectTimeout() throws IOException {
|
|
|
+
|
|
|
+ int timeoutSeconds = randomIntBetween(5, 100);
|
|
|
+
|
|
|
+ try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
|
|
|
+ processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
|
|
+ new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
|
|
|
+ Duration.ofSeconds(timeoutSeconds))) {
|
|
|
+
|
|
|
+ process.start(executorService);
|
|
|
+ }
|
|
|
+
|
|
|
+ verify(processPipes, times(1)).connectLogStream(eq(Duration.ofSeconds(timeoutSeconds)));
|
|
|
+ verify(processPipes, times(1)).connectOtherStreams(eq(Duration.ofSeconds(timeoutSeconds)));
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
|
|
|
- InputStream logStream = mock(InputStream.class);
|
|
|
- when(logStream.read(new byte[1024])).thenReturn(-1);
|
|
|
- InputStream outputStream = mock(InputStream.class);
|
|
|
- when(outputStream.read(new byte[512])).thenReturn(-1);
|
|
|
- ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
|
|
|
- try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
|
|
|
- bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
|
|
+ try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
|
|
|
+ processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
|
|
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
|
|
- process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
|
|
+ process.start(executorService, mock(IndexingStateProcessor.class));
|
|
|
|
|
|
writeFunction.accept(process);
|
|
|
process.writeUpdateModelPlotMessage(new ModelPlotConfig());
|
|
|
process.flushStream();
|
|
|
|
|
|
- String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
|
|
|
+ String message = new String(inputStream.toByteArray(), StandardCharsets.UTF_8);
|
|
|
assertTrue(message.contains(expectedMessageCode));
|
|
|
}
|
|
|
}
|