|  | @@ -14,6 +14,7 @@ import org.elasticsearch.client.Request;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.Response;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.ResponseException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.RestClient;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.CheckedRunnable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.XContentHelper;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.json.JsonXContent;
 | 
	
		
			
				|  |  |  import org.elasticsearch.test.ESTestCase;
 | 
	
	
		
			
				|  | @@ -29,7 +30,9 @@ import java.util.Arrays;
 | 
	
		
			
				|  |  |  import java.util.List;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicReference;
 | 
	
		
			
				|  |  | +import java.util.function.Predicate;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import static org.elasticsearch.test.ESTestCase.assertBusy;
 | 
	
		
			
				|  |  |  import static org.junit.Assert.assertEquals;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  public final class XPackRestTestHelper {
 | 
	
	
		
			
				|  | @@ -84,34 +87,54 @@ public final class XPackRestTestHelper {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * Waits for pending tasks to complete
 | 
	
		
			
				|  |  | +     * Wait for outstanding tasks to complete. The specified admin client is used to check the outstanding tasks and this is done using
 | 
	
		
			
				|  |  | +     * {@link ESTestCase#assertBusy(CheckedRunnable)} to give a chance to any outstanding tasks to complete.
 | 
	
		
			
				|  |  | +     *
 | 
	
		
			
				|  |  | +     * @param adminClient the admin client
 | 
	
		
			
				|  |  | +     * @throws Exception if an exception is thrown while checking the outstanding tasks
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    public static void waitForPendingTasks(RestClient adminClient) throws Exception {
 | 
	
		
			
				|  |  | -        ESTestCase.assertBusy(() -> {
 | 
	
		
			
				|  |  | +    public static void waitForPendingTasks(final RestClient adminClient) throws Exception {
 | 
	
		
			
				|  |  | +        waitForPendingTasks(adminClient, taskName -> false);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * Wait for outstanding tasks to complete. The specified admin client is used to check the outstanding tasks and this is done using
 | 
	
		
			
				|  |  | +     * {@link ESTestCase#assertBusy(CheckedRunnable)} to give a chance to any outstanding tasks to complete. The specified filter is used
 | 
	
		
			
				|  |  | +     * to filter out outstanding tasks that are expected to be there.
 | 
	
		
			
				|  |  | +     *
 | 
	
		
			
				|  |  | +     * @param adminClient the admin client
 | 
	
		
			
				|  |  | +     * @param taskFilter  predicate used to filter tasks that are expected to be there
 | 
	
		
			
				|  |  | +     * @throws Exception if an exception is thrown while checking the outstanding tasks
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    public static void waitForPendingTasks(final RestClient adminClient, final Predicate<String> taskFilter) throws Exception {
 | 
	
		
			
				|  |  | +        assertBusy(() -> {
 | 
	
		
			
				|  |  |              try {
 | 
	
		
			
				|  |  | -                Request request = new Request("GET", "/_cat/tasks");
 | 
	
		
			
				|  |  | +                final Request request = new Request("GET", "/_cat/tasks");
 | 
	
		
			
				|  |  |                  request.addParameter("detailed", "true");
 | 
	
		
			
				|  |  | -                Response response = adminClient.performRequest(request);
 | 
	
		
			
				|  |  | -                // Check to see if there are tasks still active. We exclude the
 | 
	
		
			
				|  |  | -                // list tasks
 | 
	
		
			
				|  |  | -                // actions tasks form this otherwise we will always fail
 | 
	
		
			
				|  |  | +                final Response response = adminClient.performRequest(request);
 | 
	
		
			
				|  |  | +                /*
 | 
	
		
			
				|  |  | +                 * Check to see if there are outstanding tasks; we exclude the list task itself, and any expected outstanding tasks using
 | 
	
		
			
				|  |  | +                 * the specified task filter.
 | 
	
		
			
				|  |  | +                 */
 | 
	
		
			
				|  |  |                  if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
 | 
	
		
			
				|  |  |                      try (BufferedReader responseReader = new BufferedReader(
 | 
	
		
			
				|  |  |                              new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
 | 
	
		
			
				|  |  |                          int activeTasks = 0;
 | 
	
		
			
				|  |  |                          String line;
 | 
	
		
			
				|  |  | -                        StringBuilder tasksListString = new StringBuilder();
 | 
	
		
			
				|  |  | +                        final StringBuilder tasksListString = new StringBuilder();
 | 
	
		
			
				|  |  |                          while ((line = responseReader.readLine()) != null) {
 | 
	
		
			
				|  |  | -                            if (line.startsWith(ListTasksAction.NAME) == false) {
 | 
	
		
			
				|  |  | -                                activeTasks++;
 | 
	
		
			
				|  |  | -                                tasksListString.append(line);
 | 
	
		
			
				|  |  | -                                tasksListString.append('\n');
 | 
	
		
			
				|  |  | +                            final String taskName = line.split("\\s+")[0];
 | 
	
		
			
				|  |  | +                            if (taskName.startsWith(ListTasksAction.NAME) || taskFilter.test(taskName)) {
 | 
	
		
			
				|  |  | +                                continue;
 | 
	
		
			
				|  |  |                              }
 | 
	
		
			
				|  |  | +                            activeTasks++;
 | 
	
		
			
				|  |  | +                            tasksListString.append(line);
 | 
	
		
			
				|  |  | +                            tasksListString.append('\n');
 | 
	
		
			
				|  |  |                          }
 | 
	
		
			
				|  |  |                          assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks);
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -            } catch (IOException e) {
 | 
	
		
			
				|  |  | +            } catch (final IOException e) {
 | 
	
		
			
				|  |  |                  throw new AssertionError("Error getting active tasks list", e);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          });
 |