소스 검색

Make ES|QL SAMPLE not a pipeline breaker (#132014)

Jan Kuipers 2 달 전
부모
커밋
41079181c6

+ 0 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

@@ -17,7 +17,6 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
 import org.elasticsearch.xpack.esql.plan.logical.Limit;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.Sample;
 import org.elasticsearch.xpack.esql.plan.logical.TopN;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 import org.elasticsearch.xpack.esql.plan.logical.join.Join;
@@ -29,7 +28,6 @@ import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
 import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 
 import java.util.List;
@@ -71,7 +69,6 @@ public class LocalMapper {
         //
         // Pipeline breakers
         //
-
         if (unary instanceof Aggregate aggregate) {
             List<Attribute> intermediate = MapperUtils.intermediateAttributes(aggregate);
             return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
@@ -85,14 +82,9 @@ public class LocalMapper {
             return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
         }
 
-        if (unary instanceof Sample sample) {
-            return new SampleExec(sample.source(), mappedChild, sample.probability());
-        }
-
         //
         // Pipeline operators
         //
-
         return MapperUtils.mapUnary(unary, mappedChild);
     }
 

+ 0 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

@@ -21,7 +21,6 @@ import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
 import org.elasticsearch.xpack.esql.plan.logical.Limit;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
-import org.elasticsearch.xpack.esql.plan.logical.Sample;
 import org.elasticsearch.xpack.esql.plan.logical.TopN;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
@@ -37,7 +36,6 @@ import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
 import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
@@ -187,12 +185,6 @@ public class Mapper {
             );
         }
 
-        // TODO: share code with local LocalMapper?
-        if (unary instanceof Sample sample) {
-            mappedChild = addExchangeForFragment(sample, mappedChild);
-            return new SampleExec(sample.source(), mappedChild, sample.probability());
-        }
-
         //
         // Pipeline operators
         //

+ 6 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java

@@ -25,6 +25,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
 import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
+import org.elasticsearch.xpack.esql.plan.logical.Sample;
 import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
@@ -43,6 +44,7 @@ import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
 import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
+import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
 import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
 import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
@@ -139,6 +141,10 @@ public class MapperUtils {
             return new RrfScoreEvalExec(rrf.source(), child, rrf.scoreAttribute(), rrf.forkAttribute());
         }
 
+        if (p instanceof Sample sample) {
+            return new SampleExec(sample.source(), child, sample.probability());
+        }
+
         return unsupported(p);
     }