Browse Source

Add stall_time_seconds to the error_query DSL testing query (#98114)

This allows simulating long running queries (particularly useful for CCS testing).

In addition a "NONE" exception type was added for cases where you want the stall time
greater than 0 but to not throw an Exception.

Most of the runtime logic for the Query was moved out of the ErrorQueryBuilder
and into a new ErrorQuery class (extending the Lucene Query class), so that this can be
tested in conjunction with search timeout flag to induce query timeouts and allow
testing of that feature as well.
Michael Peterson 2 years ago
parent
commit
1d593b168b

+ 2 - 1
server/src/main/java/org/elasticsearch/TransportVersion.java

@@ -173,9 +173,10 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
     public static final TransportVersion V_8_500_048 = registerTransportVersion(8_500_048, "f9658aa5-f066-4edb-bcb9-40bf256c9294");
     public static final TransportVersion V_8_500_049 = registerTransportVersion(8_500_049, "828bb6ce-2fbb-11ee-be56-0242ac120002");
     public static final TransportVersion V_8_500_050 = registerTransportVersion(8_500_050, "69722fa2-7c0a-4227-86fb-6d6a9a0a0321");
+    public static final TransportVersion V_8_500_051 = registerTransportVersion(8_500_051, "a28b43bc-bb5f-4406-afcf-26900aa98a71");
 
     private static class CurrentHolder {
-        private static final TransportVersion CURRENT = findCurrent(V_8_500_050);
+        private static final TransportVersion CURRENT = findCurrent(V_8_500_051);
 
         // finds the pluggable current version, or uses the given fallback
         private static TransportVersion findCurrent(TransportVersion fallback) {

+ 128 - 19
test/external-modules/error-query/src/main/java/org/elasticsearch/test/errorquery/ErrorQueryBuilder.java

@@ -8,8 +8,12 @@
 
 package org.elasticsearch.test.errorquery;
 
+import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.QueryVisitor;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Weight;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -28,7 +32,53 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
 
 /**
  * A test query that can simulate errors and warnings when executing a shard request.
+ * It allows specifying errors or warnings on a per-index basis and on a per-shard basis
+ * within an index. If shards are not specified, then the error or warning action will
+ * occur on all shards.
+ *
+ * To simulate longer running queries a stall (sleep) time can be added to each
+ * indices entry that specifies how long to sleep before doing a search or
+ * throwing an Exception.
+ *
+ * This can also be used for CCS testing. Example:
+ * <pre>
+ *    POST blogs,remote*:blogs/_async_search?ccs_minimize_roundtrips=true
+ *    {
+ *      "size": 0,
+ *      "query": {
+ *        "error_query": {
+ *          "indices": [
+ *          {
+ *            "name": "*",
+ *            "shard_ids": [0],
+ *            "error_type": "exception",
+ *            "message": "local cluster exception"
+ *          },
+ *          {
+ *            "stall_time_seconds": 1,
+ *            "name": "remote2:*",
+ *            "error_type": "exception",
+ *            "message": "remote2 exception"
+ *          },
+ *          {
+ *            "stall_time_seconds": 7,
+ *             "name": "remote1:blogs",
+ *             "error_type": "none"
+ *          }
+ *          ]
+ *        }
+ *      },
+ *      "aggs": {
+ *        "indexgroup": {
+ *          "terms": {
+ *            "field": "_index"
+ *          }
+ *        }
+ *      }
+ *    }
+ *  </pre>
  */
+
 public class ErrorQueryBuilder extends AbstractQueryBuilder<ErrorQueryBuilder> {
     public static final String NAME = "error_query";
 
@@ -68,25 +118,8 @@ public class ErrorQueryBuilder extends AbstractQueryBuilder<ErrorQueryBuilder> {
         if (error == null) {
             return new MatchAllDocsQuery();
         }
-        if (error.getShardIds() != null) {
-            boolean match = false;
-            for (int shardId : error.getShardIds()) {
-                if (context.getShardId() == shardId) {
-                    match = true;
-                    break;
-                }
-            }
-            if (match == false) {
-                return new MatchAllDocsQuery();
-            }
-        }
-        final String header = "[" + context.index().getName() + "][" + context.getShardId() + "]";
-        if (error.getErrorType() == IndexError.ERROR_TYPE.WARNING) {
-            HeaderWarning.addWarning(header + " " + error.getMessage());
-            return new MatchAllDocsQuery();
-        } else {
-            throw new RuntimeException(header + " " + error.getMessage());
-        }
+
+        return new ErrorQuery(error, context);
     }
 
     @SuppressWarnings("unchecked")
@@ -133,4 +166,80 @@ public class ErrorQueryBuilder extends AbstractQueryBuilder<ErrorQueryBuilder> {
     public TransportVersion getMinimalSupportedVersion() {
         return TransportVersion.ZERO;
     }
+
+    static void sleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+    }
+
+    /**
+     * ErrorQuery uses MatchAllDocsQuery when doing searches.
+     * It can optionally add warnings, throw exceptions and sleep for specified "stall times"
+     * based on the information in the provided IndexError class.
+     */
+    static class ErrorQuery extends Query {
+        private final IndexError indexError;
+        private volatile boolean sleepCompleted;
+        private final MatchAllDocsQuery matchAllQuery;
+
+        ErrorQuery(IndexError error, SearchExecutionContext context) {
+            this.indexError = error;
+            this.sleepCompleted = false;
+            this.matchAllQuery = new MatchAllDocsQuery();
+
+            if (error.getShardIds() != null) {
+                boolean match = false;
+                for (int shardId : error.getShardIds()) {
+                    if (context.getShardId() == shardId) {
+                        match = true;
+                        break;
+                    }
+                }
+                if (match == false) {
+                    return;
+                }
+            }
+            final String header = "[" + context.index().getName() + "][" + context.getShardId() + "]";
+            if (error.getErrorType() == IndexError.ERROR_TYPE.WARNING) {
+                HeaderWarning.addWarning(header + " " + error.getMessage());
+            } else if (error.getErrorType() == IndexError.ERROR_TYPE.EXCEPTION) {
+                if (indexError.getStallTimeSeconds() > 0) {
+                    sleep(indexError.getStallTimeSeconds() * 1000L);
+                }
+                throw new RuntimeException(header + " " + error.getMessage());
+            }
+        }
+
+        @Override
+        public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) {
+            if (indexError.getStallTimeSeconds() > 0 && sleepCompleted == false) {
+                sleep(indexError.getStallTimeSeconds() * 1000L);
+                sleepCompleted = true;
+            }
+            return matchAllQuery.createWeight(searcher, scoreMode, boost);
+        }
+
+        @Override
+        public String toString(String field) {
+            return "ErrorQuery MatchAll *:*";
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return sameClassAs(o);
+        }
+
+        @Override
+        public int hashCode() {
+            return classHash();
+        }
+
+        @Override
+        public void visit(QueryVisitor visitor) {
+            matchAllQuery.visit(visitor);
+        }
+    }
 }

+ 46 - 6
test/external-modules/error-query/src/main/java/org/elasticsearch/test/errorquery/IndexError.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.test.errorquery;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -29,19 +30,22 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
 public class IndexError implements Writeable, ToXContentFragment {
     enum ERROR_TYPE {
         WARNING,
-        EXCEPTION
+        EXCEPTION,
+        NONE
     }
 
     private final String indexName;
     private final int[] shardIds;
     private final ERROR_TYPE errorType;
     private final String message;
+    private final int stallTimeSeconds;  // how long to wait before returning
 
-    public IndexError(String indexName, int[] shardIds, ERROR_TYPE errorType, String message) {
+    public IndexError(String indexName, int[] shardIds, ERROR_TYPE errorType, String message, int stallTime) {
         this.indexName = indexName;
         this.shardIds = shardIds;
         this.errorType = errorType;
         this.message = message;
+        this.stallTimeSeconds = stallTime;
     }
 
     public IndexError(StreamInput in) throws IOException {
@@ -49,6 +53,11 @@ public class IndexError implements Writeable, ToXContentFragment {
         this.shardIds = in.readBoolean() ? in.readIntArray() : null;
         this.errorType = in.readEnum(ERROR_TYPE.class);
         this.message = in.readString();
+        if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_051)) {
+            this.stallTimeSeconds = in.readVInt();
+        } else {
+            this.stallTimeSeconds = 0;
+        }
     }
 
     @Override
@@ -60,6 +69,9 @@ public class IndexError implements Writeable, ToXContentFragment {
         }
         out.writeEnum(errorType);
         out.writeString(message);
+        if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_051)) {
+            out.writeVInt(stallTimeSeconds);
+        }
     }
 
     public String getIndexName() {
@@ -79,6 +91,10 @@ public class IndexError implements Writeable, ToXContentFragment {
         return message;
     }
 
+    public int getStallTimeSeconds() {
+        return stallTimeSeconds;
+    }
+
     @SuppressWarnings("unchecked")
     static final ConstructingObjectParser<IndexError, String> PARSER = new ConstructingObjectParser<>(
         "index_error",
@@ -86,11 +102,14 @@ public class IndexError implements Writeable, ToXContentFragment {
         (args, name) -> {
             List<Integer> lst = (List<Integer>) args[1];
             int[] shardIds = lst == null ? null : lst.stream().mapToInt(i -> i).toArray();
+            String message = args[3] == null ? "" : (String) args[3];
+            int stallTime = args[4] == null ? 0 : (int) args[4];
             return new IndexError(
                 (String) args[0],
                 shardIds,
                 ERROR_TYPE.valueOf(((String) args[2]).toUpperCase(Locale.ROOT)),
-                (String) args[3]
+                message,
+                stallTime
             );
         }
     );
@@ -99,7 +118,8 @@ public class IndexError implements Writeable, ToXContentFragment {
         PARSER.declareString(constructorArg(), new ParseField("name"));
         PARSER.declareIntArray(optionalConstructorArg(), new ParseField("shard_ids"));
         PARSER.declareString(constructorArg(), new ParseField("error_type"));
-        PARSER.declareString(constructorArg(), new ParseField("message"));
+        PARSER.declareString(optionalConstructorArg(), new ParseField("message"));
+        PARSER.declareInt(optionalConstructorArg(), new ParseField("stall_time_seconds"));
     }
 
     @Override
@@ -110,6 +130,7 @@ public class IndexError implements Writeable, ToXContentFragment {
         }
         builder.field("error_type", errorType.toString());
         builder.field("message", message);
+        builder.field("stall_time_seconds", stallTimeSeconds);
         return builder;
     }
 
@@ -121,13 +142,32 @@ public class IndexError implements Writeable, ToXContentFragment {
         return indexName.equals(that.indexName)
             && Arrays.equals(shardIds, that.shardIds)
             && errorType == that.errorType
-            && message.equals(that.message);
+            && message.equals(that.message)
+            && stallTimeSeconds == stallTimeSeconds;
     }
 
     @Override
     public int hashCode() {
-        int result = Objects.hash(indexName, errorType, message);
+        int result = Objects.hash(indexName, errorType, message, stallTimeSeconds);
         result = 31 * result + Arrays.hashCode(shardIds);
         return result;
     }
+
+    @Override
+    public String toString() {
+        return "IndexError{"
+            + "indexName='"
+            + indexName
+            + '\''
+            + ", shardIds="
+            + Arrays.toString(shardIds)
+            + ", errorType="
+            + errorType
+            + ", message='"
+            + message
+            + '\''
+            + ", stallTimeSeconds="
+            + stallTimeSeconds
+            + '}';
+    }
 }

+ 2 - 3
test/external-modules/error-query/src/test/java/org/elasticsearch/test/errorquery/ErrorQueryBuilderTests.java

@@ -37,9 +37,8 @@ public class ErrorQueryBuilderTests extends AbstractQueryTestCase<ErrorQueryBuil
             for (int j = 0; j < numShards; j++) {
                 shardIds[j] = j;
             }
-            indices.add(
-                new IndexError(indexName, shardIds, randomFrom(IndexError.ERROR_TYPE.values()), randomAlphaOfLengthBetween(5, 100))
-            );
+            String message = randomBoolean() ? "" : randomAlphaOfLengthBetween(5, 100);
+            indices.add(new IndexError(indexName, shardIds, randomFrom(IndexError.ERROR_TYPE.values()), message, 0));
         }
 
         return new ErrorQueryBuilder(indices);

+ 43 - 0
test/external-modules/error-query/src/yamlRestTest/resources/rest-api-spec/test/error_query/20_stall_time.yml

@@ -0,0 +1,43 @@
+# Integration tests for error_query using stall_time_seconds field
+#
+
+---
+"Error query":
+  - do:
+      indices.create:
+        index: test_exception
+        body:
+          settings:
+            index.number_of_shards: 1
+
+  # to get timed_out=true, you must have indexed some data
+  - do:
+      index:
+        index:  test_exception
+        id:     "1"
+        body:   { id: 1, foo: bar, age: 18 }
+
+  - do:
+      index:
+        index:  test_exception
+        id:     "42"
+        body:   { id: 42, foo: bar, age: 18 }
+
+  - do:
+      indices.refresh:
+        index: test_exception
+
+  - do:
+      search:
+        index: test_exception
+        timeout: 1500ms
+        body:
+          query:
+            "error_query": { "indices": [ { name: "test_exception", stall_time_seconds: 3, error_type: "none" } ] }
+
+  - match:  { hits.total.value:                                                       0 }
+  - match:  { _shards.total:                                                          1 }
+  - match:  { _shards.successful:                                                     1 }
+  - match:  { _shards.failed:                                                         0 }
+  - gte: { took: 3000 }
+  - is_true: timed_out