Browse Source

Remove global block factory (#100108)

The global BlockFactory should work fine in production, where each 
Elasticsearch node runs in its own JVM process. However, this approach
can lead to issues during testing, especially in IT tests. The same JVM
process might get reused across multiple tests, resulting in situations
where multiple IT tests inadvertently use the same instance of the
global BlockFactory.

For instance, EsqlDisruptionIT fails because it accidentally uses the 
global BlockFactory initialized by EsqlActionBreakerIT, which has a 
limit set to 1KB. Another issue in IT tests is that multiple
Elasticsearch nodes can share the same (single) global instance of the
BlockFactory.

Closes #100105
Nhat Nguyen 2 years ago
parent
commit
dbb8b7d1b9
14 changed files with 16 additions and 166 deletions
  1. 0 1
      x-pack/plugin/esql/compute/src/main/java/module-info.java
  2. 0 25
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java
  3. 0 23
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactoryParameters.java
  4. 0 42
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/TestBlockFactoryParameters.java
  5. 0 10
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockFactoryTests.java
  6. 0 4
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java
  7. 1 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/lookup/EnrichLookupIT.java
  8. 0 45
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlBlockFactoryParams.java
  9. 9 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  10. 0 8
      x-pack/plugin/esql/src/main/resources/META-INF/services/org.elasticsearch.compute.data.BlockFactoryParameters
  11. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
  12. 3 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java
  13. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java
  14. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

+ 0 - 1
x-pack/plugin/esql/compute/src/main/java/module-info.java

@@ -6,7 +6,6 @@
  */
 
 module org.elasticsearch.compute {
-    uses org.elasticsearch.compute.data.BlockFactoryParameters;
 
     requires org.apache.lucene.core;
     requires org.elasticsearch.base;

+ 0 - 25
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java

@@ -16,8 +16,6 @@ import org.elasticsearch.common.util.BytesRefArray;
 import org.elasticsearch.compute.data.Block.MvOrdering;
 
 import java.util.BitSet;
-import java.util.List;
-import java.util.ServiceLoader;
 
 public class BlockFactory {
 
@@ -26,22 +24,6 @@ public class BlockFactory {
         BigArrays.NON_RECYCLING_INSTANCE
     );
 
-    private static final BlockFactory GLOBAL = loadGlobalFactory();
-    // new BlockFactory(new NoopCircuitBreaker("esql_noop_breaker"), BigArrays.NON_RECYCLING_INSTANCE);
-
-    private static BlockFactory loadGlobalFactory() {
-        ServiceLoader<BlockFactoryParameters> loader = ServiceLoader.load(
-            BlockFactoryParameters.class,
-            BlockFactory.class.getClassLoader()
-        );
-        List<ServiceLoader.Provider<BlockFactoryParameters>> impls = loader.stream().toList();
-        if (impls.size() != 1) {
-            throw new AssertionError("expected exactly one impl, but got:" + impls);
-        }
-        BlockFactoryParameters params = impls.get(0).get();
-        return new BlockFactory(params.breaker(), params.bigArrays());
-    }
-
     private final CircuitBreaker breaker;
 
     private final BigArrays bigArrays;
@@ -51,13 +33,6 @@ public class BlockFactory {
         this.bigArrays = bigArrays;
     }
 
-    /**
-     * Returns the global ESQL block factory.
-     */
-    public static BlockFactory getGlobalInstance() {
-        return GLOBAL;
-    }
-
     /**
      * Returns the Non-Breaking block factory.
      */

+ 0 - 23
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactoryParameters.java

@@ -1,23 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.compute.data;
-
-import org.elasticsearch.common.breaker.CircuitBreaker;
-import org.elasticsearch.common.util.BigArrays;
-
-/**
- * Allows to inject instances of a breaker and bigArrays into the Global block factory.
- * The Global factory is somewhat temporary, therefore this interface and its ServiceLoader
- * machinery can be removed once the Global factory is removed.
- */
-public interface BlockFactoryParameters {
-
-    CircuitBreaker breaker();
-
-    BigArrays bigArrays();
-}

+ 0 - 42
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/TestBlockFactoryParameters.java

@@ -1,42 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.compute;
-
-import org.elasticsearch.common.breaker.CircuitBreaker;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.MockBigArrays;
-import org.elasticsearch.common.util.PageCacheRecycler;
-import org.elasticsearch.compute.data.BlockFactoryParameters;
-import org.elasticsearch.indices.breaker.CircuitBreakerService;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestBlockFactoryParameters implements BlockFactoryParameters {
-
-    final CircuitBreaker breaker;
-    final BigArrays bigArrays;
-
-    public TestBlockFactoryParameters() {
-        breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1));
-        var breakerService = mock(CircuitBreakerService.class);
-        when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(breaker);
-        bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, breakerService);
-    }
-
-    @Override
-    public CircuitBreaker breaker() {
-        return breaker;
-    }
-
-    @Override
-    public BigArrays bigArrays() {
-        return bigArrays;
-    }
-}

+ 0 - 10
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockFactoryTests.java

@@ -61,16 +61,6 @@ public class BlockFactoryTests extends ESTestCase {
             public String toString() {
                 return "1gb";
             }
-        }, new Supplier<>() {
-            @Override
-            public BlockFactory get() {
-                return BlockFactory.getGlobalInstance();
-            }
-
-            @Override
-            public String toString() {
-                return "global";
-            }
         });
         return l.stream().map(s -> new Object[] { s }).toList();
     }

+ 0 - 4
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.xpack.esql.action;
 
-import org.apache.lucene.tests.util.LuceneTestCase;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.coordination.FollowersChecker;
@@ -19,7 +18,6 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.disruption.NetworkDisruption;
 import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.TransportSettings;
 
@@ -31,9 +29,7 @@ import java.util.concurrent.TimeUnit;
 import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 
-@TestLogging(value = "org.elasticsearch.indices.breaker:TRACE", reason = "failing")
 @ESIntegTestCase.ClusterScope(scope = TEST, minNumDataNodes = 2, maxNumDataNodes = 4)
-@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99173")
 public class EsqlDisruptionIT extends EsqlActionIT {
 
     // copied from AbstractDisruptionTestCase

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/lookup/EnrichLookupIT.java

@@ -232,7 +232,7 @@ public class EnrichLookupIT extends AbstractEsqlIntegTestCase {
     static DriverContext driverContext() {
         return new DriverContext(
             new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(),
-            BlockFactory.getGlobalInstance()
+            BlockFactory.getNonBreakingInstance()
         );
     }
 }

+ 0 - 45
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlBlockFactoryParams.java

@@ -1,45 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.plugin;
-
-import org.elasticsearch.common.breaker.CircuitBreaker;
-import org.elasticsearch.common.breaker.NoopCircuitBreaker;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.BlockFactoryParameters;
-
-/** A provider for sharing the given parameters with the compute engine's block factory. */
-public class EsqlBlockFactoryParams implements BlockFactoryParameters {
-
-    static final CircuitBreaker NOOP_BREAKER = new NoopCircuitBreaker("ESQL-noop-breaker");
-
-    static CircuitBreaker ESQL_BREAKER;
-    static BigArrays ESQL_BIGARRAYS;
-
-    static void init(BigArrays bigArrays) {
-        ESQL_BREAKER = bigArrays.breakerService().getBreaker("request");
-        ESQL_BIGARRAYS = bigArrays;
-    }
-
-    final CircuitBreaker breaker;
-    final BigArrays bigArrays;
-
-    public EsqlBlockFactoryParams() {
-        this.breaker = ESQL_BREAKER;
-        this.bigArrays = ESQL_BIGARRAYS;
-    }
-
-    @Override
-    public CircuitBreaker breaker() {
-        return breaker != null ? breaker : NOOP_BREAKER;
-    }
-
-    @Override
-    public BigArrays bigArrays() {
-        return bigArrays != null ? bigArrays : BigArrays.NON_RECYCLING_INSTANCE;
-    }
-}

+ 9 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
@@ -37,6 +38,7 @@ import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Locale;
+import java.util.Objects;
 import java.util.concurrent.Executor;
 
 public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRequest, EsqlQueryResponse> {
@@ -69,8 +71,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         this.requestExecutor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME);
         exchangeService.registerTransportHandler(transportService);
         this.exchangeService = exchangeService;
-        EsqlBlockFactoryParams.init(bigArrays);
-        var blockFactory = BlockFactory.getGlobalInstance();
+        var blockFactory = createBlockFactory(bigArrays);
         this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
         this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService, bigArrays, blockFactory);
         this.computeService = new ComputeService(
@@ -85,6 +86,12 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         this.settings = settings;
     }
 
+    static BlockFactory createBlockFactory(BigArrays bigArrays) {
+        CircuitBreaker circuitBreaker = bigArrays.breakerService().getBreaker("request");
+        Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set");
+        return new BlockFactory(circuitBreaker, bigArrays);
+    }
+
     @Override
     protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
         // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can

+ 0 - 8
x-pack/plugin/esql/src/main/resources/META-INF/services/org.elasticsearch.compute.data.BlockFactoryParameters

@@ -1,8 +0,0 @@
-#
-# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
-# or more contributor license agreements. Licensed under the Elastic License
-# 2.0; you may not use this file except in compliance with the Elastic License
-# 2.0.
-#
-
-org.elasticsearch.xpack.esql.plugin.EsqlBlockFactoryParams

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -327,7 +327,7 @@ public class CsvTests extends ESTestCase {
             sessionId,
             new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()),
             bigArrays,
-            BlockFactory.getGlobalInstance(),
+            BlockFactory.getNonBreakingInstance(),
             configuration,
             exchangeSource,
             exchangeSink,

+ 3 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java

@@ -637,8 +637,9 @@ public abstract class AbstractFunctionTestCase extends ESTestCase {
      */
     protected DriverContext driverContext() {
         MockBigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1));
-        breakers.add(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST));
-        return new DriverContext(bigArrays.withCircuitBreaking(), BlockFactory.getGlobalInstance());
+        CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
+        breakers.add(breaker);
+        return new DriverContext(bigArrays.withCircuitBreaking(), new BlockFactory(breaker, bigArrays));
     }
 
     @After

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java

@@ -159,7 +159,7 @@ public class EvalMapperTests extends ESTestCase {
     static DriverContext driverContext() {
         return new DriverContext(
             new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(),
-            BlockFactory.getGlobalInstance()
+            BlockFactory.getNonBreakingInstance()
         );
     }
 }

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

@@ -119,7 +119,7 @@ public class LocalExecutionPlannerTests extends MapperServiceTestCase {
             "test",
             null,
             BigArrays.NON_RECYCLING_INSTANCE,
-            BlockFactory.getGlobalInstance(),
+            BlockFactory.getNonBreakingInstance(),
             config(),
             null,
             null,