Parcourir la source

Template upgrades should happen in a system context (#30621)

The TemplateUpgradeService is a system service that allows for plugins
to register templates that need to be upgraded. These template upgrades
should always happen in a system context as they are not a user
initiated action. For security integrations, the lack of running this
in a system context could lead to unexpected failures. The changes in
this commit set an empty system context for the execution of the
template upgrades performed by this service.

Relates #30603
Jay Modi il y a 7 ans
Parent
commit
5baadb1ff8

+ 13 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java

@@ -41,6 +41,7 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -128,12 +129,21 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
                     Version.CURRENT,
                     changes.get().v1().size(),
                     changes.get().v2().size());
-                threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
+
+                final ThreadContext threadContext = threadPool.getThreadContext();
+                try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
+                    threadContext.markAsSystemContext();
+                    threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
+                }
             }
         }
     }
 
     void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
+        if (threadPool.getThreadContext().isSystemContext() == false) {
+            throw new IllegalStateException("template updates from the template upgrade service should always happen in a system context");
+        }
+
         for (Map.Entry<String, BytesReference> change : changes.entrySet()) {
             PutIndexTemplateRequest request =
                 new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON);
@@ -141,7 +151,7 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
             client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
                 @Override
                 public void onResponse(PutIndexTemplateResponse response) {
-                    if(updatesInProgress.decrementAndGet() == 0) {
+                    if (updatesInProgress.decrementAndGet() == 0) {
                         logger.info("Finished upgrading templates to version {}", Version.CURRENT);
                     }
                     if (response.isAcknowledged() == false) {
@@ -151,7 +161,7 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
 
                 @Override
                 public void onFailure(Exception e) {
-                    if(updatesInProgress.decrementAndGet() == 0) {
+                    if (updatesInProgress.decrementAndGet() == 0) {
                         logger.info("Templates were upgraded to version {}", Version.CURRENT);
                     }
                     logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);

+ 13 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java

@@ -38,6 +38,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -61,6 +62,7 @@ import static java.util.Collections.emptyMap;
 import static org.elasticsearch.test.VersionUtils.randomVersion;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -188,9 +190,16 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
             additions.put("add_template_" + i, new BytesArray("{\"index_patterns\" : \"*\", \"order\" : " + i + "}"));
         }
 
-        TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, null,
+        ThreadPool threadPool = mock(ThreadPool.class);
+        ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+        when(threadPool.getThreadContext()).thenReturn(threadContext);
+        TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool,
             Collections.emptyList());
 
+        IllegalStateException ise = expectThrows(IllegalStateException.class, () -> service.updateTemplates(additions, deletions));
+        assertThat(ise.getMessage(), containsString("template upgrade service should always happen in a system context"));
+
+        threadContext.markAsSystemContext();
         service.updateTemplates(additions, deletions);
         int updatesInProgress = service.getUpdatesInProgress();
 
@@ -241,11 +250,14 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
         );
 
         ThreadPool threadPool = mock(ThreadPool.class);
+        ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+        when(threadPool.getThreadContext()).thenReturn(threadContext);
         ExecutorService executorService = mock(ExecutorService.class);
         when(threadPool.generic()).thenReturn(executorService);
         doAnswer(invocation -> {
             Object[] args = invocation.getArguments();
             assert args.length == 1;
+            assertTrue(threadContext.isSystemContext());
             Runnable runnable = (Runnable) args[0];
             runnable.run();
             updateInvocation.incrementAndGet();