Browse Source

Enable testing remote metadata for ES|QL CCS (#116767) (#116784)

* Enable testing remote metadata for CCS

(cherry picked from commit 270d9d2a6426dba651f2b81f7d3db46e6f5dbed2)
Stanislav Malyshev 11 months ago
parent
commit
ec89252e6c

+ 1 - 0
x-pack/plugin/esql/qa/server/multi-clusters/build.gradle

@@ -15,6 +15,7 @@ apply plugin: 'elasticsearch.bwc-test'
 dependencies {
   javaRestTestImplementation project(xpackModule('esql:qa:testFixtures'))
   javaRestTestImplementation project(xpackModule('esql:qa:server'))
+  javaRestTestImplementation project(xpackModule('esql'))
 }
 
 def supportedVersion = bwcVersion -> {

+ 25 - 4
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -45,6 +45,10 @@ import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser;
 import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
 import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
 import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -101,16 +105,25 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
 
     @Override
     protected void shouldSkipTest(String testName) throws IOException {
+        boolean remoteMetadata = testCase.requiredCapabilities.contains(METADATA_FIELDS_REMOTE_TEST.capabilityName());
+        if (remoteMetadata) {
+            // remove the capability from the test to enable it
+            testCase.requiredCapabilities = testCase.requiredCapabilities.stream()
+                .filter(c -> c.equals("metadata_fields_remote_test") == false)
+                .toList();
+        }
         super.shouldSkipTest(testName);
         checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);
-        assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query));
+        // Do not run tests including "METADATA _index" unless marked with metadata_fields_remote_test,
+        // because they may produce inconsistent results with multiple clusters.
+        assumeFalse("can't test with _index metadata", (remoteMetadata == false) && hasIndexMetadata(testCase.query));
         assumeTrue(
             "Test " + testName + " is skipped on " + Clusters.oldVersion(),
             isEnabled(testName, instructions, Clusters.oldVersion())
         );
-        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats"));
-        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats_v2"));
-        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("join_planning_v1"));
+        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
+        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
+        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
     }
 
     private TestFeatureService remoteFeaturesService() throws IOException {
@@ -151,6 +164,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         return twoClients(localClient, remoteClient);
     }
 
+    // These indices are used in metadata tests so we want them on remote only for consistency
+    public static final List<String> METADATA_INDICES = List.of("employees", "apps", "ul_logs");
+
     /**
      * Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests.
      * - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG,
@@ -166,6 +182,8 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
             String endpoint = request.getEndpoint();
             if (endpoint.startsWith("/_query")) {
                 return localClient.performRequest(request);
+            } else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
+                return remoteClient.performRequest(request);
             } else if (endpoint.endsWith("/_bulk") && ENRICH_SOURCE_INDICES.stream().noneMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
                 return bulkClient.performRequest(request);
             } else {
@@ -203,6 +221,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         return clones;
     }
 
+    /**
+     * Convert FROM employees ... => FROM *:employees,employees
+     */
     static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
         String query = testCase.query;
         String[] commands = query.split("\\|");

+ 151 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/metadata-remote.csv-spec

@@ -0,0 +1,151 @@
+simpleKeep
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | sort _index desc, emp_no | limit 2 | keep emp_no, _index, _version;
+
+emp_no:integer |_index:keyword                |_version:long
+10001          |remote_cluster:employees      |1
+10002          |remote_cluster:employees      |1
+;
+
+aliasWithSameName
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | sort _index desc, emp_no | limit 2 | eval _index = _index, _version = _version | keep emp_no, _index, _version;
+
+emp_no:integer |_index:keyword                |_version:long
+10001          |remote_cluster:employees      |1
+10002          |remote_cluster:employees      |1
+;
+
+inComparison
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | sort emp_no | where _index == "remote_cluster:employees" | where _version == 1 | keep emp_no | limit 2;
+
+emp_no:integer
+10001
+10002
+;
+
+metaIndexInAggs
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+FROM employees METADATA _index, _id
+| STATS max = MAX(emp_no) BY _index | SORT _index;
+
+max:integer |_index:keyword
+10100       |remote_cluster:employees
+;
+
+metaIndexAliasedInAggs
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index | eval _i = _index | stats max = max(emp_no) by _i | SORT _i;
+
+max:integer |_i:keyword
+10100       |remote_cluster:employees
+;
+
+metaVersionInAggs
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _version | stats min = min(emp_no) by _version;
+
+min:integer |_version:long
+10001       |1
+;
+
+metaVersionAliasedInAggs
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _version | eval _v = _version | stats min = min(emp_no) by _v;
+
+min:integer |_v:long
+10001       |1
+;
+
+inAggsAndAsGroups
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | stats max = max(_version) by _index | SORT _index;
+
+max:long |_index:keyword
+1        |remote_cluster:employees
+;
+
+inAggsAndAsGroupsAliased
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | eval _i = _index, _v = _version | stats max = max(_v) by _i | SORT _i;
+
+max:long |_i:keyword
+1        |remote_cluster:employees
+;
+
+inFunction
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | sort emp_no | where length(_index) == length("remote_cluster:employees") | where abs(_version) == 1 | keep emp_no | limit 2;
+
+emp_no:integer
+10001
+10002
+;
+
+inArithmetics
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | eval i = _version + 2 | stats min = min(emp_no) by i;
+
+min:integer |i:long
+10001       |3
+;
+
+inSort
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | sort _version, _index desc, emp_no | keep emp_no, _version, _index | limit 2;
+
+emp_no:integer |_version:long |_index:keyword
+10001          |1             |remote_cluster:employees
+10002          |1             |remote_cluster:employees
+;
+
+withMvFunction
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _version | eval i = mv_avg(_version) + 2 | stats min = min(emp_no) by i;
+
+min:integer |i:double
+10001       |3.0
+;
+
+overwritten
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+from employees metadata _index, _version | sort emp_no | eval _index = 3, _version = "version" | keep emp_no, _index, _version | limit 3;
+
+emp_no:integer |_index:integer |_version:keyword
+10001          |3              |version
+10002          |3              |version
+10003          |3              |version
+;
+
+multipleIndices
+required_capability: metadata_fields
+required_capability: metadata_fields_remote_test
+FROM ul_logs, apps METADATA _index, _version
+| WHERE id IN (13, 14) AND _version == 1
+| EVAL key = CONCAT(_index, "_", TO_STR(id))
+| SORT id, _index
+| KEEP id, _index, _version, key
+;
+
+ id:long |_index:keyword          |_version:long  |key:keyword
+13       |remote_cluster:apps     |1              |remote_cluster:apps_13        
+13       |remote_cluster:ul_logs  |1              |remote_cluster:ul_logs_13     
+14       |remote_cluster:apps     |1              |remote_cluster:apps_14        
+14       |remote_cluster:ul_logs  |1              |remote_cluster:ul_logs_14     
+    
+;

+ 3 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -470,6 +470,9 @@ public class EsqlCapabilities {
         ADD_LIMIT_INSIDE_MV_EXPAND,
 
         DELAY_DEBUG_FN(Build.current().isSnapshot()),
+
+        /** Capability for remote metadata test */
+        METADATA_FIELDS_REMOTE_TEST(false),
         /**
          * WIP on Join planning
          * - Introduce BinaryPlan and co