Browse Source

Introduce quota-aware filesystem ES plugin (#63620)

Closes #61145.

This PR adds a quota-aware filesystem plugin to Elasticsearch. This plugin
offers a way to provide user quota limits (specifically, total quota size
and available quota size) to Elasticsearch, in an implementation-agnostic
manner.

As part of this work, this PR also introduces the concept of "bootstrap
only" plugins, which are excluded from the normal plugin loading process.

Finally, note that this implementation supports `createLink(...)`, since ES
/ Lucene use hard links where possible.
Rory Hunter 5 years ago
parent
commit
2bf2d649f1
37 changed files with 2479 additions and 98 deletions
  1. 6 4
      buildSrc/src/main/groovy/org/elasticsearch/gradle/plugin/PluginBuildPlugin.groovy
  2. 20 0
      buildSrc/src/main/java/org/elasticsearch/gradle/plugin/PluginPropertiesExtension.java
  3. 37 0
      buildSrc/src/main/java/org/elasticsearch/gradle/plugin/PluginType.java
  4. 15 1
      buildSrc/src/main/java/org/elasticsearch/gradle/test/DistroTestPlugin.java
  5. 15 1
      buildSrc/src/main/resources/plugin-descriptor.properties
  6. 1 1
      distribution/src/bin/elasticsearch
  7. 1 1
      distribution/src/bin/elasticsearch-service.bat
  8. 1 1
      distribution/src/bin/elasticsearch.bat
  9. 122 0
      distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/BootstrapJvmOptions.java
  10. 15 5
      distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmOptionsParser.java
  11. 77 0
      distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/BootstrapJvmOptionsTests.java
  12. 4 0
      distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/ListPluginsCommandTests.java
  13. 4 2
      docs/build.gradle
  14. 16 0
      docs/plugins/filesystem.asciidoc
  15. 2 0
      docs/plugins/index.asciidoc
  16. 42 0
      docs/plugins/quota-aware-fs.asciidoc
  17. 5 0
      plugins/quota-aware-fs/build.gradle
  18. 98 0
      plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileStore.java
  19. 144 0
      plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystem.java
  20. 435 0
      plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystemProvider.java
  21. 230 0
      plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwarePath.java
  22. 147 0
      plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/DelegatingProvider.java
  23. 333 0
      plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystemProviderTests.java
  24. 105 0
      plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystemTests.java
  25. 116 0
      plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/SnapshotFilesystemProvider.java
  26. 1 1
      qa/os/src/test/java/org/elasticsearch/packaging/test/ArchiveTests.java
  27. 171 0
      qa/os/src/test/java/org/elasticsearch/packaging/test/QuotaAwareFsTests.java
  28. 4 0
      qa/smoke-test-plugins/build.gradle
  29. 115 30
      server/src/main/java/org/elasticsearch/plugins/PluginInfo.java
  30. 40 0
      server/src/main/java/org/elasticsearch/plugins/PluginType.java
  31. 22 13
      server/src/main/java/org/elasticsearch/plugins/PluginsService.java
  32. 5 2
      server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java
  33. 82 7
      server/src/test/java/org/elasticsearch/plugins/PluginInfoTests.java
  34. 36 27
      server/src/test/java/org/elasticsearch/plugins/PluginsServiceTests.java
  35. 4 2
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java
  36. 4 0
      x-pack/qa/smoke-test-plugins-ssl/build.gradle
  37. 4 0
      x-pack/qa/smoke-test-plugins/build.gradle

+ 6 - 4
buildSrc/src/main/groovy/org/elasticsearch/gradle/plugin/PluginBuildPlugin.groovy

@@ -51,7 +51,7 @@ class PluginBuildPlugin implements Plugin<Project> {
     void apply(Project project) {
         project.pluginManager.apply(BuildPlugin)
         project.pluginManager.apply(RestTestBasePlugin)
-        project.pluginManager.apply(CompileOnlyResolvePlugin.class);
+        project.pluginManager.apply(CompileOnlyResolvePlugin.class)
 
         PluginPropertiesExtension extension = project.extensions.create(PLUGIN_EXTENSION_NAME, PluginPropertiesExtension, project)
         configureDependencies(project)
@@ -82,7 +82,7 @@ class PluginBuildPlugin implements Plugin<Project> {
             if (extension1.description == null) {
                 throw new InvalidUserDataException('description is a required setting for esplugin')
             }
-            if (extension1.classname == null) {
+            if (extension1.type != PluginType.BOOTSTRAP && extension1.classname == null) {
                 throw new InvalidUserDataException('classname is a required setting for esplugin')
             }
 
@@ -92,10 +92,12 @@ class PluginBuildPlugin implements Plugin<Project> {
                     'version'             : extension1.version,
                     'elasticsearchVersion': Version.fromString(VersionProperties.elasticsearch).toString(),
                     'javaVersion'         : project.targetCompatibility as String,
-                    'classname'           : extension1.classname,
+                    'classname'           : extension1.type == PluginType.BOOTSTRAP ? "" : extension1.classname,
                     'extendedPlugins'     : extension1.extendedPlugins.join(','),
                     'hasNativeController' : extension1.hasNativeController,
-                    'requiresKeystore'    : extension1.requiresKeystore
+                    'requiresKeystore'    : extension1.requiresKeystore,
+                    'type'                : extension1.type.toString(),
+                    'javaOpts'            : extension1.javaOpts,
             ]
             project.tasks.named('pluginProperties').configure {
                 expand(properties)

+ 20 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/plugin/PluginPropertiesExtension.java

@@ -43,6 +43,10 @@ public class PluginPropertiesExtension {
 
     private boolean hasNativeController;
 
+    private PluginType type = PluginType.ISOLATED;
+
+    private String javaOpts = "";
+
     /** True if the plugin requires the elasticsearch keystore to exist, false otherwise. */
     private boolean requiresKeystore;
 
@@ -105,6 +109,22 @@ public class PluginPropertiesExtension {
         this.hasNativeController = hasNativeController;
     }
 
+    public PluginType getType() {
+        return type;
+    }
+
+    public void setType(PluginType type) {
+        this.type = type;
+    }
+
+    public String getJavaOpts() {
+        return javaOpts;
+    }
+
+    public void setJavaOpts(String javaOpts) {
+        this.javaOpts = javaOpts;
+    }
+
     public boolean isRequiresKeystore() {
         return requiresKeystore;
     }

+ 37 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/plugin/PluginType.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.plugin;
+
+import java.util.Locale;
+
+/**
+ * This mirrors org.elasticsearch.plugins.PluginType, which is not
+ * available to the Gradle plugin that actually builds plugins. See that
+ * class for further information.
+ */
+public enum PluginType {
+    ISOLATED,
+    BOOTSTRAP;
+
+    @Override
+    public String toString() {
+        return this.name().toLowerCase(Locale.ROOT);
+    }
+}

+ 15 - 1
buildSrc/src/main/java/org/elasticsearch/gradle/test/DistroTestPlugin.java

@@ -78,6 +78,9 @@ public class DistroTestPlugin implements Plugin<Project> {
     private static final String BWC_DISTRIBUTION_SYSPROP = "tests.bwc-distribution";
     private static final String EXAMPLE_PLUGIN_SYSPROP = "tests.example-plugin";
 
+    private static final String QUOTA_AWARE_FS_PLUGIN_CONFIGURATION = "quotaAwareFsPlugin";
+    private static final String QUOTA_AWARE_FS_PLUGIN_SYSPROP = "tests.quota-aware-fs-plugin";
+
     @Override
     public void apply(Project project) {
         project.getRootProject().getPluginManager().apply(DockerSupportPlugin.class);
@@ -100,20 +103,23 @@ public class DistroTestPlugin implements Plugin<Project> {
         TaskProvider<Task> destructiveDistroTest = project.getTasks().register("destructiveDistroTest");
 
         Configuration examplePlugin = configureExamplePlugin(project);
+        Configuration quotaAwareFsPlugin = configureQuotaAwareFsPlugin(project);
 
         List<TaskProvider<Test>> windowsTestTasks = new ArrayList<>();
         Map<Type, List<TaskProvider<Test>>> linuxTestTasks = new HashMap<>();
         Map<String, List<TaskProvider<Test>>> upgradeTestTasks = new HashMap<>();
         Map<String, TaskProvider<?>> depsTasks = new HashMap<>();
+
         for (ElasticsearchDistribution distribution : testDistributions) {
             String taskname = destructiveDistroTestTaskName(distribution);
             TaskProvider<?> depsTask = project.getTasks().register(taskname + "#deps");
-            depsTask.configure(t -> t.dependsOn(distribution, examplePlugin));
+            depsTask.configure(t -> t.dependsOn(distribution, examplePlugin, quotaAwareFsPlugin));
             depsTasks.put(taskname, depsTask);
             TaskProvider<Test> destructiveTask = configureTestTask(project, taskname, distribution, t -> {
                 t.onlyIf(t2 -> distribution.isDocker() == false || dockerSupport.get().getDockerAvailability().isAvailable);
                 addDistributionSysprop(t, DISTRIBUTION_SYSPROP, distribution::getFilepath);
                 addDistributionSysprop(t, EXAMPLE_PLUGIN_SYSPROP, () -> examplePlugin.getSingleFile().toString());
+                addDistributionSysprop(t, QUOTA_AWARE_FS_PLUGIN_SYSPROP, () -> quotaAwareFsPlugin.getSingleFile().toString());
                 t.exclude("**/PackageUpgradeTests.class");
             }, depsTask);
 
@@ -316,6 +322,14 @@ public class DistroTestPlugin implements Plugin<Project> {
         return examplePlugin;
     }
 
+    private static Configuration configureQuotaAwareFsPlugin(Project project) {
+        Configuration examplePlugin = project.getConfigurations().create(QUOTA_AWARE_FS_PLUGIN_CONFIGURATION);
+        DependencyHandler deps = project.getDependencies();
+        Map<String, String> quotaAwareFsPluginProject = Map.of("path", ":plugins:quota-aware-fs", "configuration", "zip");
+        deps.add(QUOTA_AWARE_FS_PLUGIN_CONFIGURATION, deps.project(quotaAwareFsPluginProject));
+        return examplePlugin;
+    }
+
     private static void configureVMWrapperTasks(
         Project project,
         List<TaskProvider<Test>> destructiveTasks,

+ 15 - 1
buildSrc/src/main/resources/plugin-descriptor.properties

@@ -16,6 +16,11 @@
 #
 ### mandatory elements for all plugins:
 #
+# 'type': the type of this plugin. 'isolated' indicated a typical sandboxed plugin,
+# whereas 'bootstrap' indicates a plugin whose jars are added to the JVM's boot
+# classpath.
+type=${type}
+#
 # 'description': simple summary of the plugin
 description=${description}
 #
@@ -24,9 +29,12 @@ version=${version}
 #
 # 'name': the plugin name
 name=${name}
+<% if (type != "bootstrap") { %>
 #
-# 'classname': the name of the class to load, fully-qualified.
+# 'classname': the name of the class to load, fully-qualified. Only applies to
+# "isolated" plugins
 classname=${classname}
+<% } %>
 #
 # 'java.version': version of java the code is built against
 # use the system property java.specification.version
@@ -43,3 +51,9 @@ extended.plugins=${extendedPlugins}
 #
 # 'has.native.controller': whether or not the plugin has a native controller
 has.native.controller=${hasNativeController}
+<% if (type == "bootstrap") { %>
+#
+# 'java.opts': any additional command line parameters to pass to the JVM when
+# Elasticsearch starts. Only applies to "bootstrap" plugins.
+java.opts=${javaOpts}
+<% } %>

+ 1 - 1
distribution/src/bin/elasticsearch

@@ -52,7 +52,7 @@ fi
 #   - second, JVM options are read from jvm.options and jvm.options.d/*.options
 #   - third, JVM options from ES_JAVA_OPTS are applied
 #   - fourth, ergonomic JVM options are applied
-ES_JAVA_OPTS=`export ES_TMPDIR; "$JAVA" "$XSHARE" -cp "$ES_CLASSPATH" org.elasticsearch.tools.launchers.JvmOptionsParser "$ES_PATH_CONF"`
+ES_JAVA_OPTS=`export ES_TMPDIR; "$JAVA" "$XSHARE" -cp "$ES_CLASSPATH" org.elasticsearch.tools.launchers.JvmOptionsParser "$ES_PATH_CONF" "$ES_HOME/plugins"`
 
 # manual parsing to find out, if process should be detached
 if [[ $DAEMONIZE = false ]]; then

+ 1 - 1
distribution/src/bin/elasticsearch-service.bat

@@ -124,7 +124,7 @@ if not "%ES_JAVA_OPTS%" == "" set ES_JAVA_OPTS=%ES_JAVA_OPTS: =;%
 if not "%ES_JAVA_OPTS%" == "" set ES_JAVA_OPTS=%ES_JAVA_OPTS:;;=;%
 
 @setlocal
-for /F "usebackq delims=" %%a in (`CALL %JAVA% -cp "!ES_CLASSPATH!" "org.elasticsearch.tools.launchers.JvmOptionsParser" "!ES_PATH_CONF!" ^|^| echo jvm_options_parser_failed`) do set ES_JAVA_OPTS=%%a
+for /F "usebackq delims=" %%a in (`CALL %JAVA% -cp "!ES_CLASSPATH!" "org.elasticsearch.tools.launchers.JvmOptionsParser" "!ES_PATH_CONF!" "!ES_HOME!"/plugins ^|^| echo jvm_options_parser_failed`) do set ES_JAVA_OPTS=%%a
 @endlocal & set "MAYBE_JVM_OPTIONS_PARSER_FAILED=%ES_JAVA_OPTS%" & set ES_JAVA_OPTS=%ES_JAVA_OPTS%
 
 if "%MAYBE_JVM_OPTIONS_PARSER_FAILED%" == "jvm_options_parser_failed" (

+ 1 - 1
distribution/src/bin/elasticsearch.bat

@@ -82,7 +82,7 @@ rem     jvm.options.d/*.options
 rem   - third, JVM options from ES_JAVA_OPTS are applied
 rem   - fourth, ergonomic JVM options are applied
 @setlocal
-for /F "usebackq delims=" %%a in (`CALL %JAVA% -cp "!ES_CLASSPATH!" "org.elasticsearch.tools.launchers.JvmOptionsParser" "!ES_PATH_CONF!" ^|^| echo jvm_options_parser_failed`) do set ES_JAVA_OPTS=%%a
+for /F "usebackq delims=" %%a in (`CALL %JAVA% -cp "!ES_CLASSPATH!" "org.elasticsearch.tools.launchers.JvmOptionsParser" "!ES_PATH_CONF!" "!ES_HOME!"/plugins ^|^| echo jvm_options_parser_failed`) do set ES_JAVA_OPTS=%%a
 @endlocal & set "MAYBE_JVM_OPTIONS_PARSER_FAILED=%ES_JAVA_OPTS%" & set ES_JAVA_OPTS=%ES_JAVA_OPTS%
 
 if "%MAYBE_JVM_OPTIONS_PARSER_FAILED%" == "jvm_options_parser_failed" (

+ 122 - 0
distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/BootstrapJvmOptions.java

@@ -0,0 +1,122 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.tools.launchers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * This class looks for plugins whose "type" is "bootstrap". Such plugins
+ * will be added to the JVM's boot classpath. The plugins may also define
+ * additional JVM options, in order to configure the bootstrap plugins.
+ */
+public class BootstrapJvmOptions {
+    public static List<String> bootstrapJvmOptions(Path plugins) throws IOException {
+        if (Files.isDirectory(plugins) == false) {
+            throw new IllegalArgumentException("Plugins path " + plugins + " must be a directory");
+        }
+
+        final List<PluginInfo> pluginInfo = getPluginInfo(plugins);
+
+        return generateOptions(pluginInfo);
+    }
+
+    // Find all plugins and return their jars and descriptors.
+    private static List<PluginInfo> getPluginInfo(Path plugins) throws IOException {
+        final List<PluginInfo> pluginInfo = new ArrayList<>();
+
+        final List<Path> pluginDirs = Files.list(plugins).collect(Collectors.toList());
+
+        for (Path pluginDir : pluginDirs) {
+            final List<String> jarFiles = new ArrayList<>();
+            final Properties props = new Properties();
+
+            final List<Path> pluginFiles = Files.list(pluginDir).collect(Collectors.toList());
+            for (Path pluginFile : pluginFiles) {
+                final String lowerCaseName = pluginFile.getFileName().toString().toLowerCase(Locale.ROOT);
+
+                if (lowerCaseName.endsWith(".jar")) {
+                    jarFiles.add(pluginFile.toString());
+                } else if (lowerCaseName.equals("plugin-descriptor.properties")) {
+                    try (InputStream stream = Files.newInputStream(pluginFile)) {
+                        props.load(stream);
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                }
+            }
+
+            if (props.isEmpty() == false) {
+                pluginInfo.add(new PluginInfo(jarFiles, props));
+            }
+        }
+
+        return pluginInfo;
+    }
+
+    // package-private for testing
+    static List<String> generateOptions(List<PluginInfo> pluginInfo) {
+        final List<String> bootstrapJars = new ArrayList<>();
+        final List<String> bootstrapOptions = new ArrayList<>();
+
+        for (PluginInfo info : pluginInfo) {
+            final String type = info.properties.getProperty("type", "isolated").toLowerCase(Locale.ROOT);
+
+            if (type.equals("bootstrap")) {
+                bootstrapJars.addAll(info.jarFiles);
+
+                // Add any additional Java CLI options. This could contain any number of options,
+                // but we don't attempt to split them up as all JVM options are concatenated together
+                // anyway
+                final String javaOpts = info.properties.getProperty("java.opts", "");
+                if (javaOpts.isBlank() == false) {
+                    bootstrapOptions.add(javaOpts);
+                }
+            }
+        }
+
+        if (bootstrapJars.isEmpty()) {
+            return List.of();
+        }
+
+        bootstrapOptions.add("-Xbootclasspath/a:" + String.join(":", bootstrapJars));
+
+        return bootstrapOptions;
+    }
+
+    // package-private for testing
+    static class PluginInfo {
+        public final List<String> jarFiles;
+        public final Properties properties;
+
+        PluginInfo(List<String> jarFiles, Properties properties) {
+            this.jarFiles = jarFiles;
+            this.properties = properties;
+        }
+    }
+}

+ 15 - 5
distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmOptionsParser.java

@@ -79,8 +79,10 @@ final class JvmOptionsParser {
      * @param args the args to the program which should consist of a single option, the path to ES_PATH_CONF
      */
     public static void main(final String[] args) throws InterruptedException, IOException {
-        if (args.length != 1) {
-            throw new IllegalArgumentException("expected one argument specifying path to ES_PATH_CONF but was " + Arrays.toString(args));
+        if (args.length != 2) {
+            throw new IllegalArgumentException(
+                "Expected two arguments specifying path to ES_PATH_CONF and plugins directory, but was " + Arrays.toString(args)
+            );
         }
 
         final JvmOptionsParser parser = new JvmOptionsParser();
@@ -93,7 +95,12 @@ final class JvmOptionsParser {
         }
 
         try {
-            final List<String> jvmOptions = parser.jvmOptions(Paths.get(args[0]), System.getenv("ES_JAVA_OPTS"), substitutions);
+            final List<String> jvmOptions = parser.jvmOptions(
+                Paths.get(args[0]),
+                Paths.get(args[1]),
+                System.getenv("ES_JAVA_OPTS"),
+                substitutions
+            );
             Launchers.outPrintln(String.join(" ", jvmOptions));
         } catch (final JvmOptionsFileParserException e) {
             final String errorMessage = String.format(
@@ -123,7 +130,7 @@ final class JvmOptionsParser {
         Launchers.exit(0);
     }
 
-    private List<String> jvmOptions(final Path config, final String esJavaOpts, final Map<String, String> substitutions)
+    private List<String> jvmOptions(final Path config, Path plugins, final String esJavaOpts, final Map<String, String> substitutions)
         throws InterruptedException, IOException, JvmOptionsFileParserException {
 
         final List<String> jvmOptions = readJvmOptionsFiles(config);
@@ -137,12 +144,15 @@ final class JvmOptionsParser {
         final List<String> substitutedJvmOptions = substitutePlaceholders(jvmOptions, Collections.unmodifiableMap(substitutions));
         final List<String> ergonomicJvmOptions = JvmErgonomics.choose(substitutedJvmOptions);
         final List<String> systemJvmOptions = SystemJvmOptions.systemJvmOptions();
+        final List<String> bootstrapOptions = BootstrapJvmOptions.bootstrapJvmOptions(plugins);
+
         final List<String> finalJvmOptions = new ArrayList<>(
-            systemJvmOptions.size() + substitutedJvmOptions.size() + ergonomicJvmOptions.size()
+            systemJvmOptions.size() + substitutedJvmOptions.size() + ergonomicJvmOptions.size() + bootstrapOptions.size()
         );
         finalJvmOptions.addAll(systemJvmOptions); // add the system JVM options first so that they can be overridden
         finalJvmOptions.addAll(substitutedJvmOptions);
         finalJvmOptions.addAll(ergonomicJvmOptions);
+        finalJvmOptions.addAll(bootstrapOptions);
 
         return finalJvmOptions;
     }

+ 77 - 0
distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/BootstrapJvmOptionsTests.java

@@ -0,0 +1,77 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.tools.launchers;
+
+import org.elasticsearch.tools.launchers.BootstrapJvmOptions.PluginInfo;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class BootstrapJvmOptionsTests extends LaunchersTestCase {
+
+    public void testGenerateOptionsHandlesNoPlugins() {
+        final List<String> options = BootstrapJvmOptions.generateOptions(List.of());
+        assertThat(options, is(empty()));
+    }
+
+    public void testGenerateOptionsIgnoresNonBootstrapPlugins() {
+        Properties props = new Properties();
+        props.put("type", "isolated");
+        List<PluginInfo> info = List.of(new PluginInfo(List.of(), props));
+
+        final List<String> options = BootstrapJvmOptions.generateOptions(info);
+        assertThat(options, is(empty()));
+    }
+
+    public void testGenerateOptionsHandlesBootstrapPlugins() {
+        Properties propsWithoutJavaOpts = new Properties();
+        propsWithoutJavaOpts.put("type", "bootstrap");
+        PluginInfo info1 = new PluginInfo(List.of("/path/first.jar"), propsWithoutJavaOpts);
+
+        Properties propsWithEmptyJavaOpts = new Properties();
+        propsWithEmptyJavaOpts.put("type", "bootstrap");
+        propsWithEmptyJavaOpts.put("java.opts", "");
+        PluginInfo info2 = new PluginInfo(List.of("/path/second.jar"), propsWithEmptyJavaOpts);
+
+        Properties propsWithBlankJavaOpts = new Properties();
+        propsWithBlankJavaOpts.put("type", "bootstrap");
+        propsWithBlankJavaOpts.put("java.opts", "   \t\n  ");
+        PluginInfo info3 = new PluginInfo(List.of("/path/third.jar"), propsWithBlankJavaOpts);
+
+        Properties propsWithJavaOpts = new Properties();
+        propsWithJavaOpts.put("type", "bootstrap");
+        propsWithJavaOpts.put("java.opts", "-Dkey=value -DotherKey=otherValue");
+        PluginInfo info4 = new PluginInfo(List.of("/path/fourth.jar"), propsWithJavaOpts);
+
+        final List<String> options = BootstrapJvmOptions.generateOptions(List.of(info1, info2, info3, info4));
+        assertThat(
+            options,
+            contains(
+                "-Dkey=value -DotherKey=otherValue",
+                "-Xbootclasspath/a:/path/first.jar:/path/second.jar:/path/third.jar:/path/fourth.jar"
+            )
+        );
+    }
+}

+ 4 - 0
distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/ListPluginsCommandTests.java

@@ -154,6 +154,7 @@ public class ListPluginsCommandTests extends ESTestCase {
                 "Elasticsearch Version: " + Version.CURRENT.toString(),
                 "Java Version: 1.8",
                 "Native Controller: false",
+                "Type: isolated",
                 "Extended Plugins: []",
                 " * Classname: org.fake"
             ),
@@ -176,6 +177,7 @@ public class ListPluginsCommandTests extends ESTestCase {
                 "Elasticsearch Version: " + Version.CURRENT.toString(),
                 "Java Version: 1.8",
                 "Native Controller: true",
+                "Type: isolated",
                 "Extended Plugins: []",
                 " * Classname: org.fake"
             ),
@@ -199,6 +201,7 @@ public class ListPluginsCommandTests extends ESTestCase {
                 "Elasticsearch Version: " + Version.CURRENT.toString(),
                 "Java Version: 1.8",
                 "Native Controller: false",
+                "Type: isolated",
                 "Extended Plugins: []",
                 " * Classname: org.fake",
                 "fake_plugin2",
@@ -209,6 +212,7 @@ public class ListPluginsCommandTests extends ESTestCase {
                 "Elasticsearch Version: " + Version.CURRENT.toString(),
                 "Java Version: 1.8",
                 "Native Controller: false",
+                "Type: isolated",
                 "Extended Plugins: []",
                 " * Classname: org.fake2"
             ),

+ 4 - 2
docs/build.gradle

@@ -84,8 +84,10 @@ testClusters.integTest {
 // build the cluster with all plugins
 project.rootProject.subprojects.findAll { it.parent.path == ':plugins' }.each { subproj ->
   /* Skip repositories. We just aren't going to be able to test them so it
-   * doesn't make sense to waste time installing them. */
-  if (subproj.path.startsWith(':plugins:repository-')) {
+   * doesn't make sense to waste time installing them.
+   * Also skip quota-aware-fs since it has to be configured in order to use
+   * it, otherwise ES will not start. */
+  if (subproj.path.startsWith(':plugins:repository-') || subproj.path.startsWith(':plugins:quota-aware-fs')) {
     return
   }
   // Do not install ingest-attachment in a FIPS 140 JVM as this is not supported

+ 16 - 0
docs/plugins/filesystem.asciidoc

@@ -0,0 +1,16 @@
+[[filesystem]]
+== Filesystem Plugins
+
+Filesystem plugins modify how {es} interacts with the host filesystem.
+
+[discrete]
+=== Core filesystem plugins
+
+The core filesystem plugins are:
+
+<<quota-aware-fs,Quota-aware Filesystem>>::
+
+The Quota-aware Filesystem plugin adds an interface for telling Elasticsearch the disk-quota limits under which it is operating.
+
+include::quota-aware-fs.asciidoc[]
+

+ 2 - 0
docs/plugins/index.asciidoc

@@ -46,6 +46,8 @@ include::analysis.asciidoc[]
 
 include::discovery.asciidoc[]
 
+include::filesystem.asciidoc[]
+
 include::ingest.asciidoc[]
 
 include::management.asciidoc[]

+ 42 - 0
docs/plugins/quota-aware-fs.asciidoc

@@ -0,0 +1,42 @@
+[[quota-aware-fs]]
+=== Quota-aware Filesystem Plugin
+
+The Quota-aware Filesystem plugin adds an interface for telling
+Elasticsearch the disk-quota limits under which it is operating.
+
+:plugin_name: quota-aware-fs
+include::install_remove.asciidoc[]
+
+[[quota-aware-fs-usage]]
+==== Passing disk quota information to Elasticsearch
+
+{es} considers the available disk space on a node before deciding whether
+to allocate new shards to that node or to actively relocate shards away from that node.
+However, while the JVM has support for reporting a filesystem's total space and available
+space, it has no knowledge of any quota limits imposed on the user under which {es} is
+running. Consequently, the {es} mechanisms for handling low disk space situations cannot
+function. To work around this situation, this plugin provides a mechanism for supplying quota-ware
+total and available amounts of disk space.
+
+To use the plugin, install it on all nodes and restart them. You must configure the plugin
+by supplying the `es.fs.quota.file` {ref}/jvm-options.html[JVM system property] on startup. This
+property specifies a URI to a properties file, which contains the total and available
+amounts.
+
+NOTE: {es} will not start successfully if you install the `quota-aware-fs` plugin,
+but you do not supply the `es.fs.quota.file` system property at startup.
+
+[source,text]
+----
+-Des.fs.quota.file=file:///path/to/some.properties
+----
+
+The properties file must contain the keys `total` and `remaining`, both of which contain the respective
+number in bytes. You are responsible for writing this file with the correct values, and keeping the
+values up-to-date. {es} will poll this file regularly to pick up any changes.
+
+[source,properties]
+----
+total=976490576
+remaining=376785728
+----

+ 5 - 0
plugins/quota-aware-fs/build.gradle

@@ -0,0 +1,5 @@
+esplugin {
+  description 'A bootstrap plugin that adds support for interfacing with filesystem that enforce user quotas.'
+  type = 'bootstrap'
+  javaOpts = '-Djava.nio.file.spi.DefaultFileSystemProvider=org.elasticsearch.fs.quotaaware.QuotaAwareFileSystemProvider'
+}

+ 98 - 0
plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileStore.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.fs.quotaaware;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.attribute.FileStoreAttributeView;
+
+/**
+ * An implementation of {@link FileStore} that relies on
+ * {@link QuotaAwareFileSystemProvider} for usage reporting. Other methods are
+ * delegated to the backing instance of {@link FileStore}
+ */
+public final class QuotaAwareFileStore extends FileStore {
+
+    private final FileStore backingFS;
+    private final QuotaAwareFileSystemProvider provider;
+
+    QuotaAwareFileStore(QuotaAwareFileSystemProvider provider, FileStore backingFS) {
+        this.provider = provider;
+        this.backingFS = backingFS;
+    }
+
+    @Override
+    public String name() {
+        return backingFS.name();
+    }
+
+    @Override
+    public String type() {
+        return backingFS.type();
+    }
+
+    @Override
+    public boolean isReadOnly() {
+        return backingFS.isReadOnly();
+    }
+
+    @Override
+    public long getTotalSpace() throws IOException {
+        return Math.min(provider.getTotal(), backingFS.getTotalSpace());
+    }
+
+    @Override
+    public long getUsableSpace() throws IOException {
+        return Math.min(provider.getRemaining(), backingFS.getUsableSpace());
+    }
+
+    @Override
+    public long getUnallocatedSpace() throws IOException {
+        // There is no point in telling users that the underlying
+        // host has more capacity than what they're allowed to use so we limit
+        // this one with remaining as well.
+        return Math.min(provider.getRemaining(), backingFS.getUnallocatedSpace());
+    }
+
+    @Override
+    public boolean supportsFileAttributeView(Class<? extends FileAttributeView> type) {
+        return backingFS.supportsFileAttributeView(type);
+    }
+
+    @Override
+    public boolean supportsFileAttributeView(String name) {
+        return backingFS.supportsFileAttributeView(name);
+    }
+
+    @Override
+    public <V extends FileStoreAttributeView> V getFileStoreAttributeView(Class<V> type) {
+        return backingFS.getFileStoreAttributeView(type);
+    }
+
+    @Override
+    public Object getAttribute(String attribute) throws IOException {
+        return backingFS.getAttribute(attribute);
+    }
+
+    @Override
+    public String toString() {
+        return "QuotaAwareFileStore(" + backingFS.toString() + ")";
+    }
+}

+ 144 - 0
plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystem.java

@@ -0,0 +1,144 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.fs.quotaaware;
+
+import org.elasticsearch.common.SuppressForbidden;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.nio.file.WatchService;
+import java.nio.file.attribute.UserPrincipalLookupService;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.StreamSupport;
+
+/**
+ * An implementation of {@link FileSystem} that returns the given
+ * {@link QuotaAwareFileSystemProvider} provider {@link #provider()}.
+ *
+ * Other methods are delegated to given instance of {@link FileSystem} and
+ * wrapped where result types are either @link {@link QuotaAwareFileSystem}
+ * or @link {@link QuotaAwareFileStore}.
+ *
+ */
+public final class QuotaAwareFileSystem extends FileSystem {
+    private final FileSystem delegate;
+    private final QuotaAwareFileSystemProvider provider;
+
+    QuotaAwareFileSystem(QuotaAwareFileSystemProvider provider, FileSystem delegate) {
+        this.provider = Objects.requireNonNull(provider, "Provider is required");
+        this.delegate = Objects.requireNonNull(delegate, "FileSystem is required");
+    }
+
+    @Override
+    public QuotaAwareFileSystemProvider provider() {
+        return provider;
+    }
+
+    @Override
+    @SuppressForbidden(reason = "accesses the default filesystem by design")
+    public void close() throws IOException {
+        if (this == FileSystems.getDefault()) {
+            throw new UnsupportedOperationException("The default file system cannot be closed");
+        } else if (delegate != FileSystems.getDefault()) {
+            delegate.close();
+        }
+        provider.purge(delegate);
+    }
+
+    @Override
+    public boolean isOpen() {
+        return delegate.isOpen();
+    }
+
+    @Override
+    public boolean isReadOnly() {
+        return delegate.isReadOnly();
+    }
+
+    @Override
+    public String getSeparator() {
+        return delegate.getSeparator();
+    }
+
+    @Override
+    public Iterable<Path> getRootDirectories() {
+        return StreamSupport.stream(delegate.getRootDirectories().spliterator(), false).map((Function<Path, Path>) this::wrap)::iterator;
+    }
+
+    @Override
+    public Iterable<FileStore> getFileStores() {
+        return StreamSupport.stream(delegate.getFileStores().spliterator(), false)
+            .map((Function<FileStore, FileStore>) provider::getFileStore)::iterator;
+    }
+
+    @Override
+    public Set<String> supportedFileAttributeViews() {
+        return delegate.supportedFileAttributeViews();
+    }
+
+    @Override
+    public Path getPath(String first, String... more) {
+        return wrap(delegate.getPath(first, more));
+    }
+
+    private QuotaAwarePath wrap(Path delegatePath) {
+        if (delegatePath == null) return null;
+        else return new QuotaAwarePath(this, delegatePath);
+    }
+
+    @Override
+    public PathMatcher getPathMatcher(String syntaxAndPattern) {
+        PathMatcher matcher = delegate.getPathMatcher(syntaxAndPattern);
+        return (path) -> matcher.matches(QuotaAwarePath.unwrap(path));
+    }
+
+    @Override
+    public UserPrincipalLookupService getUserPrincipalLookupService() {
+        return delegate.getUserPrincipalLookupService();
+    }
+
+    @Override
+    public WatchService newWatchService() throws IOException {
+        return delegate.newWatchService();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        QuotaAwareFileSystem other = (QuotaAwareFileSystem) obj;
+        if (!delegate.equals(other.delegate)) return false;
+        if (!provider.equals(other.provider)) return false;
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return delegate.hashCode();
+    }
+
+}

+ 435 - 0
plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystemProvider.java

@@ -0,0 +1,435 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.fs.quotaaware;
+
+import org.elasticsearch.core.internal.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.AccessMode;
+import java.nio.file.CopyOption;
+import java.nio.file.DirectoryStream;
+import java.nio.file.DirectoryStream.Filter;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.ProviderMismatchException;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.spi.FileSystemProvider;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.StreamSupport;
+
+/**
+ * QuotaAwareFileSystemProvider is intended to be used as a wrapper around the
+ * default file system provider of the JVM.
+ *
+ * It augments the default file system by allowing to report total size and
+ * remaining capacity according to some externally defined quota that is not
+ * readily available to the JVM. Essentially it is a workaround for containers
+ * that see the size of the host volume and not the effective quota available to
+ * them. It will however never report larger capacity than what the underlying
+ * file system sees.
+ *
+ * In application usage:
+ * <ol>
+ * <li>Include this project in the class path</li>
+ * <li>Specify this argument at JVM boot:
+ * <code>-Djava.nio.file.spi.DefaultFileSystemProvider=co.elastic.cloud.quotaawarefs.QuotaAwareFileSystemProvider</code>
+ * </li>
+ * <li>Have some external system check the quota usage and
+ * write it to the path specified by the system property {@value #QUOTA_PATH_KEY}.</li>
+ * </ol>
+ *
+ * In any case the quota file must be a {@link Properties} file with the
+ * properties <code>total</code> and <code>remaining</code>. Both properties are
+ * {@link Long} values, parsed according to {@link Long#parseLong(String)} and
+ * assumed to be in bytes.
+ *
+ * Sample format:
+ *
+ * <pre>
+ * {@code
+ * total=5000
+ * remaining=200
+ * }
+ * </pre>
+ */
+public class QuotaAwareFileSystemProvider extends FileSystemProvider implements AutoCloseable {
+
+    private static final int CHECK_PERIOD = 1000;
+
+    private final class RefreshLimitsTask extends TimerTask {
+        @Override
+        public void run() {
+            try {
+                assert timerThread == Thread.currentThread() : "QuotaAwareFileSystemProvider doesn't support multithreaded timer.";
+                refreshLimits();
+            } catch (Exception e) {
+                // Canceling from the timer Thread guarantees last execution,
+                // so no need to check for duplicate error
+                error.set(e);
+                timer.cancel();
+            }
+        }
+
+    }
+
+    static final String QUOTA_PATH_KEY = "es.fs.quota.file";
+
+    private final FileSystemProvider delegate;
+    private final Path configPath;
+
+    private volatile long total = Long.MAX_VALUE;
+    private volatile long remaining = Long.MAX_VALUE;
+
+    private final Timer timer;
+
+    private final ConcurrentHashMap<FileSystem, QuotaAwareFileSystem> systemsCache = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<FileStore, QuotaAwareFileStore> storesCache = new ConcurrentHashMap<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicReference<Throwable> error = new AtomicReference<>();
+
+    private static final AtomicInteger NEXT_SERIAL = new AtomicInteger(0);
+    private final String timerThreadName = "QuotaAwareFSTimer-" + NEXT_SERIAL.getAndIncrement();
+    private final Thread timerThread;
+
+    public QuotaAwareFileSystemProvider(FileSystemProvider delegate) throws Exception {
+        this(delegate, URI.create(getUri()));
+    }
+
+    private static String getUri() {
+        final String property = System.getProperty(QUOTA_PATH_KEY);
+        if (property == null) {
+            throw new IllegalArgumentException(
+                "Property "
+                    + QUOTA_PATH_KEY
+                    + " must be set to a URI in order to use the quota filesystem provider, e.g. using -D"
+                    + QUOTA_PATH_KEY
+                    + "=file://path/to/fsquota.properties"
+            );
+        }
+
+        return property;
+    }
+
+    public QuotaAwareFileSystemProvider(FileSystemProvider delegate, URI config) throws Exception {
+        if (delegate instanceof QuotaAwareFileSystemProvider) {
+            throw new IllegalArgumentException("Delegate provider cannot be an instance of QuotaAwareFileSystemProvider");
+        }
+        this.delegate = delegate;
+        configPath = delegate.getPath(config);
+        refreshLimits(); // Ensures that a parseable file exists before
+                         // timer is created
+        timer = new Timer(timerThreadName, true);
+        try {
+            timerThread = getThreadFromTimer(timer).get();
+            timer.schedule(new RefreshLimitsTask(), CHECK_PERIOD, CHECK_PERIOD);
+        } catch (Exception e1) {
+            if (e1 instanceof InterruptedException) {
+                Thread.currentThread().interrupt(); // Restore interrupted flag
+            }
+            try {
+                // Avoid thread leak if this failed to start
+                timer.cancel();
+            } catch (Exception e2) {
+                e1.addSuppressed(e2);
+            }
+            throw e1;
+        }
+    }
+
+    /**
+     * Extracts the {@link Thread} used by a {@link Timer}.
+     *
+     * Ideally {@link Timer} would provide necessary health checks or error
+     * handling support that this would not be required.
+     *
+     * @param timer
+     *            The {@link Timer} instance to extract thread from
+     * @return the Thread used by the given timer
+     * @throws IllegalStateException
+     *             if Timer is cancelled.
+     */
+    private static Future<Thread> getThreadFromTimer(Timer timer) {
+        FutureTask<Thread> timerThreadFuture = new FutureTask<>(() -> Thread.currentThread());
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                timerThreadFuture.run();
+            }
+        }, 0);
+        return timerThreadFuture;
+    }
+
+    /**
+     * Performs a single attempt at reading the required files.
+     *
+     * @throws PrivilegedActionException if something goes wrong
+     * @throws IOException if something goes wrong
+     *
+     * @throws IllegalStateException
+     *             if security manager denies reading the property file
+     * @throws IllegalStateException
+     *             if the property file cannot be parsed
+     */
+    private void refreshLimits() throws IOException, PrivilegedActionException {
+        try (InputStream newInputStream = AccessController.doPrivileged(new PrivilegedExceptionAction<InputStream>() {
+            @Override
+            public InputStream run() throws Exception {
+                return delegate.newInputStream(configPath, StandardOpenOption.READ);
+            }
+        })) {
+            Properties properties = new Properties();
+            properties.load(newInputStream);
+            total = Long.parseLong(properties.getProperty("total"));
+            remaining = Long.parseLong(properties.getProperty("remaining"));
+        }
+    }
+
+    @Override
+    public String getScheme() {
+        return "file";
+    }
+
+    @Override
+    public FileSystem newFileSystem(URI uri, Map<String, ?> env) throws IOException {
+        // Delegate handles throwing exception if filesystem already exists
+        return getFS(delegate.newFileSystem(uri, env));
+
+    }
+
+    private QuotaAwareFileSystem getFS(FileSystem fileSystem) {
+        return systemsCache.computeIfAbsent(fileSystem, (delegate) -> new QuotaAwareFileSystem(this, delegate));
+    }
+
+    @Override
+    public FileSystem getFileSystem(URI uri) {
+        // Delegate handles throwing exception if filesystem doesn't exist, but
+        // delegate may also have precreated filesystems, like the default file
+        // system.
+        return getFS(delegate.getFileSystem(uri));
+    }
+
+    @Override
+    public Path getPath(URI uri) {
+        return ensureWrapped(delegate.getPath(uri));
+    }
+
+    private Path ensureWrapped(Path path) {
+        if (path instanceof QuotaAwarePath) {
+            assert path.getFileSystem().provider() == this;
+            // Delegate may use this instance to create the path when this
+            // instance is installed as the default provider.
+            // This is safe because nested QuotaAwareProviders are prohibited,
+            // otherwise this would require unwrapping.
+            return path;
+        } else {
+            return new QuotaAwarePath(getFS(path.getFileSystem()), path);
+        }
+    }
+
+    @Override
+    public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
+        return delegate.newByteChannel(QuotaAwarePath.unwrap(path), options, attrs);
+    }
+
+    @Override
+    public DirectoryStream<Path> newDirectoryStream(Path dir, Filter<? super Path> filter) throws IOException {
+        return new DirectoryStream<Path>() {
+            DirectoryStream<Path> stream = delegate.newDirectoryStream(QuotaAwarePath.unwrap(dir), filter);
+
+            @Override
+            public void close() throws IOException {
+                stream.close();
+            }
+
+            @Override
+            public Iterator<Path> iterator() {
+                return StreamSupport.stream(stream.spliterator(), false).map(QuotaAwareFileSystemProvider.this::ensureWrapped).iterator();
+            }
+        };
+    }
+
+    @Override
+    public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException {
+        delegate.createDirectory(QuotaAwarePath.unwrap(dir), attrs);
+    }
+
+    @Override
+    public void delete(Path path) throws IOException {
+        delegate.delete(QuotaAwarePath.unwrap(path));
+    }
+
+    @Override
+    public void copy(Path source, Path target, CopyOption... options) throws IOException {
+        delegate.copy(QuotaAwarePath.unwrap(source), QuotaAwarePath.unwrap(target), options);
+    }
+
+    @Override
+    public void move(Path source, Path target, CopyOption... options) throws IOException {
+        delegate.move(QuotaAwarePath.unwrap(source), QuotaAwarePath.unwrap(target), options);
+    }
+
+    @Override
+    public boolean isSameFile(Path path, Path path2) throws IOException {
+        return delegate.isSameFile(QuotaAwarePath.unwrap(path), QuotaAwarePath.unwrap(path2));
+    }
+
+    @Override
+    public boolean isHidden(Path path) throws IOException {
+        return delegate.isHidden(QuotaAwarePath.unwrap(path));
+    }
+
+    @Override
+    public QuotaAwareFileStore getFileStore(Path path) throws IOException {
+        return getFileStore(delegate.getFileStore(QuotaAwarePath.unwrap(path)));
+    }
+
+    QuotaAwareFileStore getFileStore(FileStore store) {
+        return storesCache.computeIfAbsent(store, (fs) -> new QuotaAwareFileStore(QuotaAwareFileSystemProvider.this, fs));
+    }
+
+    @Override
+    public void checkAccess(Path path, AccessMode... modes) throws IOException {
+        delegate.checkAccess(QuotaAwarePath.unwrap(path), modes);
+    }
+
+    @Override
+    public <V extends FileAttributeView> V getFileAttributeView(Path path, Class<V> type, LinkOption... options) {
+        return delegate.getFileAttributeView(QuotaAwarePath.unwrap(path), type, options);
+    }
+
+    @Override
+    public <A extends BasicFileAttributes> A readAttributes(Path path, Class<A> type, LinkOption... options) throws IOException {
+        try {
+            return delegate.readAttributes(QuotaAwarePath.unwrap(path), type, options);
+        } catch (ProviderMismatchException e) {
+            throw new IllegalArgumentException("Failed to read attributes for path: [" + path + "]", e);
+        }
+    }
+
+    @Override
+    public Map<String, Object> readAttributes(Path path, String attributes, LinkOption... options) throws IOException {
+        return delegate.readAttributes(QuotaAwarePath.unwrap(path), attributes, options);
+    }
+
+    @Override
+    public void setAttribute(Path path, String attribute, Object value, LinkOption... options) throws IOException {
+        delegate.setAttribute(QuotaAwarePath.unwrap(path), attribute, value, options);
+    }
+
+    long getTotal() {
+        ensureHealth();
+        return total;
+    }
+
+    void ensureHealth() throws AssertionError {
+        boolean timerIsAlive = timerThread.isAlive();
+        Throwable cause = error.get();
+        if (cause != null || !timerIsAlive) {
+            throw new AssertionError("The quota aware filesystem has failed", cause);
+        }
+    }
+
+    long getRemaining() {
+        ensureHealth();
+        return remaining;
+    }
+
+    @Override
+    public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
+        return delegate.newFileChannel(QuotaAwarePath.unwrap(path), options, attrs);
+    }
+
+    @Override
+    public AsynchronousFileChannel newAsynchronousFileChannel(
+        Path path,
+        Set<? extends OpenOption> options,
+        ExecutorService executor,
+        FileAttribute<?>... attrs
+    ) throws IOException {
+        return delegate.newAsynchronousFileChannel(QuotaAwarePath.unwrap(path), options, executor, attrs);
+    }
+
+    /**
+     * Normally only used in testing. Avoids thread leak when life cycle of this
+     * object doesn't follow that of the JVM.
+     *
+     * @throws IOException if something goes wrong
+     */
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            timer.cancel();
+
+            // If there was a currently executing task wait for it, to
+            // avoid false positives on file handle leak.
+            try {
+                timerThread.join();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt(); // Restore interrupted flag
+            }
+
+            try {
+                IOUtils.close(systemsCache.values());
+            } finally {
+                storesCache.clear();
+            }
+        }
+    }
+
+    @Override
+    public void createLink(Path link, Path existing) throws IOException {
+        delegate.createLink(link, existing);
+    }
+
+    @Override
+    public void createSymbolicLink(Path link, Path target, FileAttribute<?>... attrs) throws IOException {
+        delegate.createSymbolicLink(link, target, attrs);
+    }
+
+    void purge(FileSystem delegateFileSystem) {
+        systemsCache.remove(delegateFileSystem);
+    }
+}

+ 230 - 0
plugins/quota-aware-fs/src/main/java/org/elasticsearch/fs/quotaaware/QuotaAwarePath.java

@@ -0,0 +1,230 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.fs.quotaaware;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.ProviderMismatchException;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchEvent.Modifier;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.stream.StreamSupport;
+
+/**
+ * A wrapper implementation of {@link Path} that ensures the given
+ * {@link QuotaAwareFileSystem} is returned in {@link #getFileSystem()} from
+ * this instance and any other {@link Path} returned from this.
+ *
+ */
+public final class QuotaAwarePath implements Path {
+
+    private final QuotaAwareFileSystem fileSystem;
+    private final Path delegate;
+
+    QuotaAwarePath(QuotaAwareFileSystem fileSystem, Path delegate) {
+        if (delegate instanceof QuotaAwarePath) {
+            throw new IllegalArgumentException("Nested quota wrappers are not supported.");
+        }
+        this.fileSystem = Objects.requireNonNull(fileSystem, "A filesystem is required");
+        this.delegate = Objects.requireNonNull(delegate, "A delegate path is required");
+    }
+
+    @Override
+    public FileSystem getFileSystem() {
+        return fileSystem;
+    }
+
+    @Override
+    public boolean isAbsolute() {
+        return delegate.isAbsolute();
+    }
+
+    @Override
+    public Path getRoot() {
+        return wrap(delegate.getRoot());
+    }
+
+    @Override
+    public Path getFileName() {
+        return wrap(delegate.getFileName());
+    }
+
+    private Path wrap(Path delegate) {
+        if (delegate == null) {
+            return null;
+        } else {
+            return new QuotaAwarePath(fileSystem, delegate);
+        }
+    }
+
+    static Path unwrap(Path path) {
+        if (path instanceof QuotaAwarePath) {
+            return ((QuotaAwarePath) path).delegate;
+        } else {
+            return path;
+        }
+    }
+
+    @Override
+    public Path getParent() {
+        return wrap(delegate.getParent());
+    }
+
+    @Override
+    public int getNameCount() {
+        return delegate.getNameCount();
+    }
+
+    @Override
+    public Path getName(int index) {
+        return wrap(delegate.getName(index));
+    }
+
+    @Override
+    public Path subpath(int beginIndex, int endIndex) {
+        return wrap(delegate.subpath(beginIndex, endIndex));
+    }
+
+    @Override
+    public boolean startsWith(Path other) {
+        return delegate.startsWith(unwrap(other));
+    }
+
+    @Override
+    public boolean startsWith(String other) {
+        return delegate.startsWith(other);
+    }
+
+    @Override
+    public boolean endsWith(Path other) {
+        return delegate.endsWith(unwrap(other));
+    }
+
+    @Override
+    public boolean endsWith(String other) {
+        return delegate.endsWith(other);
+    }
+
+    @Override
+    public Path normalize() {
+        return wrap(delegate.normalize());
+    }
+
+    @Override
+    public Path resolve(Path other) {
+        return wrap(delegate.resolve(unwrap(other)));
+    }
+
+    @Override
+    public Path resolve(String other) {
+        return wrap(delegate.resolve(other));
+    }
+
+    @Override
+    public Path resolveSibling(Path other) {
+        return wrap(delegate.resolveSibling(unwrap(other)));
+    }
+
+    @Override
+    public Path resolveSibling(String other) {
+        return wrap(delegate.resolveSibling(other));
+    }
+
+    @Override
+    public Path relativize(Path other) {
+        return wrap(delegate.relativize(unwrap(other)));
+    }
+
+    @Override
+    public URI toUri() {
+        return delegate.toUri();
+    }
+
+    @Override
+    public Path toAbsolutePath() {
+        return wrap(delegate.toAbsolutePath());
+    }
+
+    @Override
+    public Path toRealPath(LinkOption... options) throws IOException {
+        return wrap(delegate.toRealPath(options));
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, Kind<?>[] events, Modifier... modifiers) throws IOException {
+        return delegate.register(watcher, events, modifiers);
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, Kind<?>... events) throws IOException {
+        return delegate.register(watcher, events);
+    }
+
+    @Override
+    public Iterator<Path> iterator() {
+        return StreamSupport.stream(delegate.spliterator(), false).map(this::wrap).iterator();
+    }
+
+    @Override
+    public int compareTo(Path other) {
+        return delegate.compareTo(toDelegate(other));
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        QuotaAwarePath other = (QuotaAwarePath) obj;
+        if (!delegate.equals(other.delegate)) return false;
+        if (!fileSystem.equals(other.fileSystem)) return false;
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return delegate.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+
+    private Path toDelegate(Path path) {
+        if (path instanceof QuotaAwarePath) {
+            QuotaAwarePath qaPath = (QuotaAwarePath) path;
+            if (qaPath.fileSystem != fileSystem) {
+                throw new ProviderMismatchException(
+                    "mismatch, expected: " + fileSystem.provider().getClass() + ", got: " + qaPath.fileSystem.provider().getClass()
+                );
+            }
+            return qaPath.delegate;
+        } else {
+            throw new ProviderMismatchException("mismatch, expected: QuotaAwarePath, got: " + path.getClass());
+        }
+    }
+}

+ 147 - 0
plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/DelegatingProvider.java

@@ -0,0 +1,147 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.fs.quotaaware;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.AccessMode;
+import java.nio.file.CopyOption;
+import java.nio.file.DirectoryStream;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A simple purely delegating provider, allows tests to only override the
+ * methods they need custom behaviour on.
+ */
+class DelegatingProvider extends FileSystemProvider {
+    private final FileSystemProvider provider;
+
+    /**
+     * An optional field for subclasses that need to test cyclic references
+     */
+    protected FileSystemProvider cyclicReference;
+
+    DelegatingProvider(FileSystemProvider provider) {
+        this.provider = provider;
+        this.cyclicReference = provider;
+    }
+
+    @Override
+    public String getScheme() {
+        return provider.getScheme();
+    }
+
+    @Override
+    public FileSystem newFileSystem(URI uri, Map<String, ?> env) throws IOException {
+        return provider.newFileSystem(uri, env);
+    }
+
+    @Override
+    public FileSystem getFileSystem(URI uri) {
+        return provider.getFileSystem(uri);
+    }
+
+    @Override
+    public Path getPath(URI uri) {
+        return provider.getPath(uri);
+    }
+
+    @Override
+    public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
+        return provider.newByteChannel(path, options, attrs);
+    }
+
+    @Override
+    public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
+        return provider.newDirectoryStream(dir, filter);
+    }
+
+    @Override
+    public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException {
+        provider.createDirectory(dir, attrs);
+    }
+
+    @Override
+    public void delete(Path path) throws IOException {
+        provider.delete(path);
+    }
+
+    @Override
+    public void copy(Path source, Path target, CopyOption... options) throws IOException {
+        provider.copy(source, target, options);
+    }
+
+    @Override
+    public void move(Path source, Path target, CopyOption... options) throws IOException {
+        provider.move(source, target, options);
+    }
+
+    @Override
+    public boolean isSameFile(Path path, Path path2) throws IOException {
+        return provider.isSameFile(path, path2);
+    }
+
+    @Override
+    public boolean isHidden(Path path) throws IOException {
+        return provider.isHidden(path);
+    }
+
+    @Override
+    public FileStore getFileStore(Path path) throws IOException {
+        return provider.getFileStore(path);
+    }
+
+    @Override
+    public void checkAccess(Path path, AccessMode... modes) throws IOException {
+        provider.checkAccess(path, modes);
+    }
+
+    @Override
+    public <V extends FileAttributeView> V getFileAttributeView(Path path, Class<V> type, LinkOption... options) {
+        return provider.getFileAttributeView(path, type, options);
+    }
+
+    @Override
+    public <A extends BasicFileAttributes> A readAttributes(Path path, Class<A> type, LinkOption... options) throws IOException {
+        return provider.readAttributes(path, type, options);
+    }
+
+    @Override
+    public Map<String, Object> readAttributes(Path path, String attributes, LinkOption... options) throws IOException {
+        return provider.readAttributes(path, attributes, options);
+    }
+
+    @Override
+    public void setAttribute(Path path, String attribute, Object value, LinkOption... options) throws IOException {
+        provider.setAttribute(path, attribute, value, options);
+    }
+
+}

+ 333 - 0
plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystemProviderTests.java

@@ -0,0 +1,333 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.fs.quotaaware;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestRuleLimitSysouts.Limit;
+import org.elasticsearch.common.SuppressForbidden;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystems;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.spi.FileSystemProvider;
+import java.security.PrivilegedActionException;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.elasticsearch.fs.quotaaware.QuotaAwareFileSystemProvider.QUOTA_PATH_KEY;
+import static org.hamcrest.Matchers.startsWith;
+
+@Limit(bytes = 10000)
+@SuppressForbidden(reason = "accesses the default filesystem by design")
+public class QuotaAwareFileSystemProviderTests extends LuceneTestCase {
+
+    public void testSystemPropertyShouldBeSet() {
+        FileSystemProvider systemProvider = FileSystems.getDefault().provider();
+        System.clearProperty(QUOTA_PATH_KEY);
+
+        final IllegalArgumentException exception = expectThrows(
+            IllegalArgumentException.class,
+            () -> new QuotaAwareFileSystemProvider(systemProvider)
+        );
+
+        assertThat(exception.getMessage(), startsWith("Property " + QUOTA_PATH_KEY + " must be set to a URI"));
+    }
+
+    public void testInitiallyNoQuotaFile() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = FileSystems.getDefault().provider();
+        Throwable cause = null;
+
+        try (QuotaAwareFileSystemProvider ignored = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            fail(); //
+        } catch (PrivilegedActionException e) {
+            cause = e.getCause();
+        }
+        assertTrue("Should be FileNotFoundException", cause instanceof NoSuchFileException);
+    }
+
+    public void testBasicQuotaFile() throws Exception {
+        doValidFileTest(500, 200);
+    }
+
+    public void testUpdateQuotaFile() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        writeQuota(500L, 200L, systemProvider, quotaFile);
+
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            assertEquals(500, provider.getTotal());
+            assertEquals(200, provider.getRemaining());
+            writeQuota(450, 150, systemProvider, quotaFile);
+            withRetry(2000, 500, () -> {
+                assertEquals(450, provider.getTotal());
+                assertEquals(150, provider.getRemaining());
+            });
+        }
+    }
+
+    public void testRepeatedUpdate() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        writeQuota(500L, 200L, systemProvider, quotaFile);
+        Random random = new Random();
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            for (int i = 0; i < 10; i++) {
+                long expectedTotal = Math.abs(random.nextLong());
+                long expectedRemaining = Math.abs(random.nextLong());
+                writeQuota(expectedTotal, expectedRemaining, systemProvider, quotaFile);
+                withRetry(2000, 100, () -> {
+                    assertEquals(expectedTotal, provider.getTotal());
+                    assertEquals(expectedRemaining, provider.getRemaining());
+                });
+            }
+        }
+    }
+
+    public void testEventuallyMissingQuotaFile() throws Exception {
+
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        writeQuota(500L, 200L, systemProvider, quotaFile);
+
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            assertEquals(500, provider.getTotal());
+            assertEquals(200, provider.getRemaining());
+
+            systemProvider.delete(quotaFile);
+
+            withRetry(2000, 500, () -> {
+                boolean gotError = false;
+                try {
+                    provider.getTotal();
+                } catch (AssertionError e) {
+                    gotError = true;
+                }
+                assertTrue(gotError);
+            });
+        }
+    }
+
+    public void testEventuallyMalformedQuotaFile() throws Exception {
+
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        writeQuota(500L, 200L, systemProvider, quotaFile);
+
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            assertEquals(500, provider.getTotal());
+            assertEquals(200, provider.getRemaining());
+
+            try (
+                OutputStream stream = systemProvider.newOutputStream(quotaFile, WRITE, TRUNCATE_EXISTING);
+                OutputStreamWriter streamWriter = new OutputStreamWriter(stream, StandardCharsets.UTF_8);
+                PrintWriter printWriter = new PrintWriter(streamWriter)
+            ) {
+                printWriter.write("This is not valid properties file syntax");
+            }
+
+            withRetry(2000, 500, () -> {
+                boolean gotError = false;
+                try {
+                    provider.getTotal();
+                } catch (AssertionError e) {
+                    gotError = true;
+                }
+                assertTrue(gotError);
+            });
+        }
+    }
+
+    public void testHighQuotaFile() throws Exception {
+        doValidFileTest(Long.MAX_VALUE - 1L, Long.MAX_VALUE - 2L);
+    }
+
+    public void testMalformedNumberInQuotaFile() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        Properties quota = new Properties();
+        quota.setProperty("total", "ThisNotANumber");
+        quota.setProperty("remaining", "1");
+        try (OutputStream stream = systemProvider.newOutputStream(quotaFile, WRITE, CREATE_NEW)) {
+            quota.store(stream, "QuotaFile for: QuotaAwareFileSystemProviderTest#malformedNumberInQuotaFile");
+        }
+
+        expectThrows(NumberFormatException.class, () -> new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri()));
+    }
+
+    public void testMalformedQuotaFile() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+
+        try (
+            OutputStream stream = systemProvider.newOutputStream(quotaFile, WRITE, CREATE_NEW);
+            OutputStreamWriter streamWriter = new OutputStreamWriter(stream, StandardCharsets.UTF_8);
+            PrintWriter printWriter = new PrintWriter(streamWriter)
+        ) {
+            printWriter.write("This is not valid properties file syntax");
+        }
+
+        expectThrows(Exception.class, () -> new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri()));
+    }
+
+    public void testFileStoreLimited() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        Properties quota = new Properties();
+        long expectedTotal = 500;
+        long expectedRemaining = 200;
+        quota.setProperty("total", Long.toString(expectedTotal));
+        quota.setProperty("remaining", Long.toString(expectedRemaining));
+        try (OutputStream stream = systemProvider.newOutputStream(quotaFile, WRITE, CREATE_NEW)) {
+            quota.store(stream, "QuotaFile for: QuotaAwareFileSystemProviderTest#fileStoreLimited");
+        }
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            Path path = createTempFile();
+            FileStore fileStore = provider.getFileStore(path);
+            assertEquals(expectedTotal, fileStore.getTotalSpace());
+            assertEquals(expectedRemaining, fileStore.getUsableSpace());
+            assertEquals(expectedRemaining, fileStore.getUnallocatedSpace());
+        }
+    }
+
+    public void testFileStoreNotLimited() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        DelegatingProvider snapshotProvider = new SnapshotFilesystemProvider(quotaFile.getFileSystem().provider());
+
+        Properties quota = new Properties();
+        quota.setProperty("total", Long.toString(Long.MAX_VALUE));
+        quota.setProperty("remaining", Long.toString(Long.MAX_VALUE));
+
+        try (OutputStream stream = snapshotProvider.newOutputStream(quotaFile, WRITE, CREATE_NEW)) {
+            quota.store(stream, "QuotaFile for: QuotaAwareFileSystemProviderTest#fileStoreNotLimited");
+        }
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(snapshotProvider, quotaFile.toUri())) {
+            Path path = createTempFile();
+            FileStore fileStore = provider.getFileStore(path);
+            FileStore unLimitedStore = snapshotProvider.getFileStore(path);
+            assertEquals(unLimitedStore.getTotalSpace(), fileStore.getTotalSpace());
+            assertEquals(unLimitedStore.getUsableSpace(), fileStore.getUsableSpace());
+            assertEquals(unLimitedStore.getUnallocatedSpace(), fileStore.getUnallocatedSpace());
+        }
+    }
+
+    public void testDefaultFilesystemIsPreinitialized() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        Properties quota = new Properties();
+        quota.setProperty("total", Long.toString(Long.MAX_VALUE));
+        quota.setProperty("remaining", Long.toString(Long.MAX_VALUE));
+        try (OutputStream stream = systemProvider.newOutputStream(quotaFile, WRITE, CREATE_NEW)) {
+            quota.store(stream, "QuotaFile for: QuotaAwareFileSystemProviderTest#defaultFilesystemIsPreinitialized");
+        }
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            assertNotNull(provider.getFileSystem(new URI("file:///")));
+        }
+    }
+
+    /**
+     * Mimics a cyclic reference that may happen when
+     * {@link QuotaAwareFileSystemProvider} is installed as the default provider
+     * in the JVM and the delegate provider references
+     * {@link FileSystems#getDefault()} by for instance relying on
+     * {@link File#toPath()}
+     */
+    public void testHandleReflexiveDelegate() throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        DelegatingProvider cyclicProvider = new DelegatingProvider(systemProvider) {
+            @Override
+            public Path getPath(URI uri) {
+                try {
+                    return cyclicReference.getFileSystem(new URI("file:///")).getPath(uri.getPath());
+                } catch (URISyntaxException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+
+        Properties quota = new Properties();
+        quota.setProperty("total", Long.toString(Long.MAX_VALUE));
+        quota.setProperty("remaining", Long.toString(Long.MAX_VALUE));
+        try (OutputStream stream = systemProvider.newOutputStream(quotaFile, WRITE, CREATE_NEW)) {
+            quota.store(stream, "QuotaFile for: QuotaAwareFileSystemProviderTest#testHandleReflexiveDelegate");
+        }
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(cyclicProvider, quotaFile.toUri())) {
+            cyclicProvider.cyclicReference = provider;
+            assertNotNull(provider.getPath(new URI("file:///")));
+        }
+    }
+
+    private void doValidFileTest(long expectedTotal, long expectedRemaining) throws Exception {
+        Path quotaFile = createTempDir().resolve("quota.properties");
+        FileSystemProvider systemProvider = quotaFile.getFileSystem().provider();
+        writeQuota(expectedTotal, expectedRemaining, systemProvider, quotaFile);
+
+        try (QuotaAwareFileSystemProvider provider = new QuotaAwareFileSystemProvider(systemProvider, quotaFile.toUri())) {
+            assertEquals(expectedTotal, provider.getTotal());
+            assertEquals(expectedRemaining, provider.getRemaining());
+        }
+    }
+
+    private void writeQuota(long expectedTotal, long expectedRemaining, FileSystemProvider systemProvider, Path quotaFile)
+        throws IOException {
+        Properties quota = new Properties();
+        quota.setProperty("total", Long.toString(expectedTotal));
+        quota.setProperty("remaining", Long.toString(expectedRemaining));
+        try (OutputStream outputStream = systemProvider.newOutputStream(quotaFile, WRITE, CREATE, TRUNCATE_EXISTING)) {
+            // Ideally this would use atomic write and the scala allocator does,
+            // but the parsing logic should be able to deal with it in either
+            // case.
+            quota.store(outputStream, "QuotaFile for: QuotaAwareFileSystemProviderTest#doValidFileTest");
+        }
+    }
+
+    public static void withRetry(int maxMillis, int interval, Runnable func) throws Exception {
+        long endBy = System.currentTimeMillis() + maxMillis;
+
+        while (true) {
+            try {
+                func.run();
+                break;
+            } catch (AssertionError | Exception e) {
+                if (System.currentTimeMillis() + interval < endBy) {
+                    Thread.sleep(interval);
+                    continue;
+                }
+                throw new IllegalStateException("Retry timed out after [" + maxMillis + "]ms", e);
+            }
+        }
+
+    }
+
+}

+ 105 - 0
plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/QuotaAwareFileSystemTests.java

@@ -0,0 +1,105 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.fs.quotaaware;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.DirectoryStream;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Properties;
+
+import org.apache.lucene.mockfile.MockFileSystemTestCase;
+import org.elasticsearch.common.SuppressForbidden;
+
+public class QuotaAwareFileSystemTests extends MockFileSystemTestCase {
+
+    private QuotaAwareFileSystemProvider quotaAwareFileSystemProvider;
+
+    @Override
+    @SuppressForbidden(reason = "accesses the default filesystem by design")
+    public void setUp() throws Exception {
+        super.setUp();
+        Path quotaPath = createTempDir().resolve("fsquota.properties");
+        Properties properties = new Properties();
+        properties.put("total", Long.toString(Long.MAX_VALUE));
+        properties.put("remaining", Long.toString(Long.MAX_VALUE));
+        try (OutputStream outputStream = Files.newOutputStream(quotaPath)) {
+            properties.store(outputStream, "");
+        }
+        quotaAwareFileSystemProvider = new QuotaAwareFileSystemProvider(FileSystems.getDefault().provider(), quotaPath.toUri());
+    }
+
+    @Override
+    protected Path wrap(Path path) {
+        return new QuotaAwarePath(new QuotaAwareFileSystem(quotaAwareFileSystemProvider, path.getFileSystem()), path);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        quotaAwareFileSystemProvider.close();
+        super.tearDown();
+    }
+
+    /** Tests that newDirectoryStream with a filter works correctly */
+    @Override
+    public void testDirectoryStreamFiltered() throws IOException {
+        Path dir = wrap(createTempDir());
+
+        OutputStream file = Files.newOutputStream(dir.resolve("file1"));
+        file.write(5);
+        file.close();
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
+            int count = 0;
+            for (Path path : stream) {
+                assertTrue(path instanceof QuotaAwarePath);
+                if (!path.getFileName().toString().startsWith("extra")) {
+                    count++;
+                }
+            }
+            assertEquals(1, count);
+        }
+        dir.getFileSystem().close();
+    }
+
+    /** Tests that newDirectoryStream with globbing works correctly */
+    @Override
+    public void testDirectoryStreamGlobFiltered() throws IOException {
+        Path dir = wrap(createTempDir());
+
+        OutputStream file = Files.newOutputStream(dir.resolve("foo"));
+        file.write(5);
+        file.close();
+        file = Files.newOutputStream(dir.resolve("bar"));
+        file.write(5);
+        file.close();
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "f*")) {
+            int count = 0;
+            for (Path path : stream) {
+                assertTrue(path instanceof QuotaAwarePath);
+                ++count;
+            }
+            assertEquals(1, count);
+        }
+        dir.getFileSystem().close();
+    }
+
+}

+ 116 - 0
plugins/quota-aware-fs/src/test/java/org/elasticsearch/fs/quotaaware/SnapshotFilesystemProvider.java

@@ -0,0 +1,116 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.fs.quotaaware;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.FileStore;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.attribute.FileStoreAttributeView;
+import java.nio.file.spi.FileSystemProvider;
+
+/**
+ * A {@link FileSystemProvider} implementation that delegates to another provider, but
+ * takes a snapshot of the total, usable and unallocated space values upon creation.
+ * This prevents tests failing due to activity on the underlying filesystem.
+ */
+class SnapshotFilesystemProvider extends DelegatingProvider {
+    private final long totalSpace;
+    private final long usableSpace;
+    private final long unallocatedSpace;
+
+    SnapshotFilesystemProvider(FileSystemProvider provider) throws Exception {
+        super(provider);
+
+        final FileStore fileStore = provider.getFileStore(provider.getPath(new URI("file:///")));
+        this.totalSpace = fileStore.getTotalSpace();
+        this.usableSpace = fileStore.getUsableSpace();
+        this.unallocatedSpace = fileStore.getUnallocatedSpace();
+    }
+
+    @Override
+    public FileStore getFileStore(Path path) throws IOException {
+        return new SnapshotFileStore(super.getFileStore(path));
+    }
+
+    private class SnapshotFileStore extends FileStore {
+        private final FileStore delegate;
+
+        SnapshotFileStore(FileStore fileStore) {
+            this.delegate = fileStore;
+        }
+
+        @Override
+        public long getBlockSize() throws IOException {
+            return delegate.getBlockSize();
+        }
+
+        @Override
+        public String name() {
+            return delegate.name();
+        }
+
+        @Override
+        public String type() {
+            return delegate.type();
+        }
+
+        @Override
+        public boolean isReadOnly() {
+            return delegate.isReadOnly();
+        }
+
+        @Override
+        public long getTotalSpace() {
+            return totalSpace;
+        }
+
+        @Override
+        public long getUsableSpace() {
+            return usableSpace;
+        }
+
+        @Override
+        public long getUnallocatedSpace() {
+            return unallocatedSpace;
+        }
+
+        @Override
+        public boolean supportsFileAttributeView(Class<? extends FileAttributeView> type) {
+            return delegate.supportsFileAttributeView(type);
+        }
+
+        @Override
+        public boolean supportsFileAttributeView(String name) {
+            return delegate.supportsFileAttributeView(name);
+        }
+
+        @Override
+        public <V extends FileStoreAttributeView> V getFileStoreAttributeView(Class<V> type) {
+            return delegate.getFileStoreAttributeView(type);
+        }
+
+        @Override
+        public Object getAttribute(String attribute) throws IOException {
+            return delegate.getAttribute(attribute);
+        }
+    }
+}

+ 1 - 1
qa/os/src/test/java/org/elasticsearch/packaging/test/ArchiveTests.java

@@ -128,7 +128,7 @@ public class ArchiveTests extends PackagingTestCase {
         } catch (Exception e) {
             if (Files.exists(installation.home.resolve("elasticsearch.pid"))) {
                 String pid = FileUtils.slurp(installation.home.resolve("elasticsearch.pid")).trim();
-                logger.info("Dumping jstack of elasticsearch processb ({}) that failed to start", pid);
+                logger.info("Dumping jstack of elasticsearch process ({}) that failed to start", pid);
                 sh.runIgnoreExitCode("jstack " + pid);
             }
             throw e;

+ 171 - 0
qa/os/src/test/java/org/elasticsearch/packaging/test/QuotaAwareFsTests.java

@@ -0,0 +1,171 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.packaging.test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.fluent.Request;
+import org.elasticsearch.packaging.util.ServerUtils;
+import org.elasticsearch.packaging.util.Shell;
+import org.junit.After;
+import org.junit.BeforeClass;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Locale;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Check that the quota-aware filesystem plugin can be installed, and that it operates as expected.
+ */
+public class QuotaAwareFsTests extends PackagingTestCase {
+
+    // private static final String QUOTA_AWARE_FS_PLUGIN_NAME = "quota-aware-fs";
+    private static final Path QUOTA_AWARE_FS_PLUGIN;
+    static {
+        // Re-read before each test so the plugin path can be manipulated within tests.
+        // Corresponds to DistroTestPlugin#QUOTA_AWARE_FS_PLUGIN_SYSPROP
+        QUOTA_AWARE_FS_PLUGIN = Paths.get(System.getProperty("tests.quota-aware-fs-plugin"));
+    }
+
+    @BeforeClass
+    public static void filterDistros() {
+        assumeTrue("only archives", distribution.isArchive());
+    }
+
+    @After
+    public void teardown() throws Exception {
+        super.teardown();
+        cleanup();
+    }
+
+    /**
+     * Check that when the plugin is installed but the system property for passing the location of the related
+     * properties file is omitted, then Elasticsearch exits with the expected error message.
+     */
+    public void test10ElasticsearchRequiresSystemPropertyToBeSet() throws Exception {
+        install();
+
+        installation.executables().pluginTool.run("install --batch \"" + QUOTA_AWARE_FS_PLUGIN.toUri() + "\"");
+
+        // Without setting the `es.fs.quota.file` property, ES should exit with a failure code.
+        final Shell.Result result = runElasticsearchStartCommand(null, false, false);
+
+        assertThat("Elasticsearch should have terminated unsuccessfully", result.isSuccess(), equalTo(false));
+        assertThat(
+            result.stderr,
+            containsString("Property es.fs.quota.file must be set to a URI in order to use the quota filesystem provider")
+        );
+    }
+
+    /**
+     * Check that when the plugin is installed but the system property for passing the location of the related
+     * properties file contains a non-existent URI, then Elasticsearch exits with the expected error message.
+     */
+    public void test20ElasticsearchRejectsNonExistentPropertiesLocation() throws Exception {
+        install();
+
+        installation.executables().pluginTool.run("install --batch \"" + QUOTA_AWARE_FS_PLUGIN.toUri() + "\"");
+
+        sh.getEnv().put("ES_JAVA_OPTS", "-Des.fs.quota.file=file:///this/does/not/exist.properties");
+
+        final Shell.Result result = runElasticsearchStartCommand(null, false, false);
+
+        // Generate a Path for this location so that the platform-specific line-endings will be used.
+        final String platformPath = Path.of("/this/does/not/exist.properties").toString();
+
+        assertThat("Elasticsearch should have terminated unsuccessfully", result.isSuccess(), equalTo(false));
+        assertThat(result.stderr, containsString("NoSuchFileException: " + platformPath));
+    }
+
+    /**
+     * Check that Elasticsearch can load the plugin and apply the quota limits in the properties file. Also check that
+     * Elasticsearch polls the file for changes.
+     */
+    public void test30ElasticsearchStartsWhenSystemPropertySet() throws Exception {
+        install();
+
+        int total = 20 * 1024 * 1024;
+        int available = 10 * 1024 * 1024;
+
+        installation.executables().pluginTool.run("install --batch \"" + QUOTA_AWARE_FS_PLUGIN.toUri() + "\"");
+
+        final Path quotaPath = getRootTempDir().resolve("quota.properties");
+        Files.writeString(quotaPath, String.format(Locale.ROOT, "total=%d\nremaining=%d\n", total, available));
+
+        sh.getEnv().put("ES_JAVA_OPTS", "-Des.fs.quota.file=" + quotaPath.toUri());
+
+        try {
+            startElasticsearch();
+
+            final Totals actualTotals = fetchFilesystemTotals();
+
+            assertThat(actualTotals.totalInBytes, equalTo(total));
+            assertThat(actualTotals.availableInBytes, equalTo(available));
+
+            int updatedTotal = total * 3;
+            int updatedAvailable = available * 3;
+
+            // Check that ES is polling the properties file for changes by modifying the properties file
+            // and waiting for ES to pick up the changes.
+            Files.writeString(quotaPath, String.format(Locale.ROOT, "total=%d\nremaining=%d\n", updatedTotal, updatedAvailable));
+
+            // The check interval is 1000ms, but give ourselves some leeway.
+            Thread.sleep(2000);
+
+            final Totals updatedActualTotals = fetchFilesystemTotals();
+
+            assertThat(updatedActualTotals.totalInBytes, equalTo(updatedTotal));
+            assertThat(updatedActualTotals.availableInBytes, equalTo(updatedAvailable));
+        } finally {
+            stopElasticsearch();
+            Files.deleteIfExists(quotaPath);
+        }
+    }
+
+    private static class Totals {
+        int totalInBytes;
+        int availableInBytes;
+
+        Totals(int totalInBytes, int availableInBytes) {
+            this.totalInBytes = totalInBytes;
+            this.availableInBytes = availableInBytes;
+        }
+    }
+
+    private Totals fetchFilesystemTotals() throws Exception {
+        final String response = ServerUtils.makeRequest(Request.Get("http://localhost:9200/_nodes/stats"));
+
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode rootNode = mapper.readTree(response);
+
+        assertThat("Some nodes failed", rootNode.at("/_nodes/failed").intValue(), equalTo(0));
+
+        final String nodeId = rootNode.get("nodes").fieldNames().next();
+
+        final JsonNode fsNode = rootNode.at("/nodes/" + nodeId + "/fs/total");
+
+        return new Totals(fsNode.get("total_in_bytes").intValue(), fsNode.get("available_in_bytes").intValue());
+    }
+}

+ 4 - 0
qa/smoke-test-plugins/build.gradle

@@ -33,6 +33,10 @@ testClusters.integTest {
       // Do not attempt to install ingest-attachment in FIPS 140 as it is not supported (it depends on non-FIPS BouncyCastle)
       return
     }
+    if (pluginName == 'quota-aware-fs') {
+      // This plugin has to be configured to work via system properties
+      return
+    }
     plugin pluginProject.path
     pluginsCount += 1
   }

+ 115 - 30
server/src/main/java/org/elasticsearch/plugins/PluginInfo.java

@@ -21,6 +21,7 @@ package org.elasticsearch.plugins;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.bootstrap.JarHell;
+import org.elasticsearch.common.Booleans;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -49,6 +50,8 @@ public class PluginInfo implements Writeable, ToXContentObject {
     public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties";
     public static final String ES_PLUGIN_POLICY = "plugin-security.policy";
 
+    private static final Version QUOTA_FS_PLUGIN_SUPPORT = Version.CURRENT;
+
     private final String name;
     private final String description;
     private final String version;
@@ -57,6 +60,8 @@ public class PluginInfo implements Writeable, ToXContentObject {
     private final String classname;
     private final List<String> extendedPlugins;
     private final boolean hasNativeController;
+    private final PluginType type;
+    private final String javaOpts;
 
     /**
      * Construct plugin info.
@@ -69,9 +74,12 @@ public class PluginInfo implements Writeable, ToXContentObject {
      * @param classname             the entry point to the plugin
      * @param extendedPlugins       other plugins this plugin extends through SPI
      * @param hasNativeController   whether or not the plugin has a native controller
+     * @param type                  the type of the plugin. Expects "bootstrap" or "isolated".
+     * @param javaOpts              any additional JVM CLI parameters added by this plugin
      */
     public PluginInfo(String name, String description, String version, Version elasticsearchVersion, String javaVersion,
-                      String classname, List<String> extendedPlugins, boolean hasNativeController) {
+                      String classname, List<String> extendedPlugins, boolean hasNativeController,
+                      PluginType type, String javaOpts) {
         this.name = name;
         this.description = description;
         this.version = version;
@@ -80,6 +88,8 @@ public class PluginInfo implements Writeable, ToXContentObject {
         this.classname = classname;
         this.extendedPlugins = Collections.unmodifiableList(extendedPlugins);
         this.hasNativeController = hasNativeController;
+        this.type = type;
+        this.javaOpts = javaOpts;
     }
 
     /**
@@ -97,6 +107,14 @@ public class PluginInfo implements Writeable, ToXContentObject {
         this.classname = in.readString();
         extendedPlugins = in.readStringList();
         hasNativeController = in.readBoolean();
+
+        if (in.getVersion().onOrAfter(QUOTA_FS_PLUGIN_SUPPORT)) {
+            type = PluginType.valueOf(in.readString());
+            javaOpts = in.readOptionalString();
+        } else {
+            type = PluginType.ISOLATED;
+            javaOpts = null;
+        }
     }
 
     @Override
@@ -109,6 +127,11 @@ public class PluginInfo implements Writeable, ToXContentObject {
         out.writeString(classname);
         out.writeStringCollection(extendedPlugins);
         out.writeBoolean(hasNativeController);
+
+        if (out.getVersion().onOrAfter(QUOTA_FS_PLUGIN_SUPPORT)) {
+            out.writeString(type.name());
+            out.writeOptionalString(javaOpts);
+        }
     }
 
     /**
@@ -158,11 +181,6 @@ public class PluginInfo implements Writeable, ToXContentObject {
                     "property [java.version] is missing for plugin [" + name + "]");
         }
         JarHell.checkVersionFormat(javaVersionString);
-        final String classname = propsMap.remove("classname");
-        if (classname == null) {
-            throw new IllegalArgumentException(
-                    "property [classname] is missing for plugin [" + name + "]");
-        }
 
         final String extendedString = propsMap.remove("extended.plugins");
         final List<String> extendedPlugins;
@@ -172,36 +190,72 @@ public class PluginInfo implements Writeable, ToXContentObject {
             extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ","));
         }
 
-        final String hasNativeControllerValue = propsMap.remove("has.native.controller");
-        final boolean hasNativeController;
-        if (hasNativeControllerValue == null) {
-            hasNativeController = false;
-        } else {
-            switch (hasNativeControllerValue) {
-                case "true":
-                    hasNativeController = true;
-                    break;
-                case "false":
-                    hasNativeController = false;
-                    break;
-                default:
-                    final String message = String.format(
-                            Locale.ROOT,
-                            "property [%s] must be [%s], [%s], or unspecified but was [%s]",
-                            "has_native_controller",
-                            "true",
-                            "false",
-                            hasNativeControllerValue);
-                    throw new IllegalArgumentException(message);
-            }
+        final boolean hasNativeController = parseBooleanValue(name, "has.native.controller", propsMap.remove("has.native.controller"));
+
+        final PluginType type = getPluginType(name, propsMap.remove("type"));
+
+        final String classname = getClassname(name, type, propsMap.remove("classname"));
+
+        final String javaOpts = propsMap.remove("java.opts");
+
+        if (type != PluginType.BOOTSTRAP && Strings.isNullOrEmpty(javaOpts) == false) {
+            throw new IllegalArgumentException(
+                "[java.opts] can only have a value when [type] is set to [bootstrap] for plugin [" + name + "]"
+            );
         }
 
         if (propsMap.isEmpty() == false) {
-            throw new IllegalArgumentException("Unknown properties in plugin descriptor: " + propsMap.keySet());
+            throw new IllegalArgumentException("Unknown properties for plugin [" + name + "] in plugin descriptor: " + propsMap.keySet());
         }
 
         return new PluginInfo(name, description, version, esVersion, javaVersionString,
-                              classname, extendedPlugins, hasNativeController);
+                              classname, extendedPlugins, hasNativeController, type, javaOpts);
+    }
+
+    private static PluginType getPluginType(String name, String rawType) {
+        if (Strings.isNullOrEmpty(rawType)) {
+            return PluginType.ISOLATED;
+        }
+
+        try {
+            return PluginType.valueOf(rawType.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                "[type] must be unspecified or one of [isolated, bootstrap] but found [" + rawType + "] for plugin [" + name + "]"
+            );
+        }
+    }
+
+    private static String getClassname(String name, PluginType type, String classname) {
+        if (type == PluginType.BOOTSTRAP) {
+            if (!Strings.isNullOrEmpty(classname)) {
+                throw new IllegalArgumentException(
+                    "property [classname] can only have a value when [type] is set to [bootstrap] for plugin [" + name + "]"
+                );
+            }
+            return "";
+        }
+
+        if (classname == null) {
+            throw new IllegalArgumentException("property [classname] is missing for plugin [" + name + "]");
+        }
+
+        return classname;
+    }
+
+    private static boolean parseBooleanValue(String pluginName, String name, String rawValue) {
+        try {
+            return Booleans.parseBoolean(rawValue, false);
+        } catch (IllegalArgumentException e) {
+            final String message = String.format(
+                Locale.ROOT,
+                "property [%s] must be [true], [false], or unspecified but was [%s] for plugin [%s]",
+                name,
+                rawValue,
+                pluginName
+            );
+            throw new IllegalArgumentException(message);
+        }
     }
 
     /**
@@ -276,6 +330,26 @@ public class PluginInfo implements Writeable, ToXContentObject {
         return hasNativeController;
     }
 
+    /**
+     * Returns the type of this plugin. Can be "isolated" for regular sandboxed plugins, or "bootstrap"
+     * for plugins that affect how Elasticsearch's JVM runs.
+     *
+     * @return the type of the plugin
+     */
+    public PluginType getType() {
+        return type;
+    }
+
+    /**
+     * Returns any additional JVM command-line options that this plugin adds. Only applies to
+     * plugins whose <code>type</code> is "bootstrap".
+     *
+     * @return any additional JVM options.
+     */
+    public String getJavaOpts() {
+        return javaOpts;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -288,6 +362,10 @@ public class PluginInfo implements Writeable, ToXContentObject {
             builder.field("classname", classname);
             builder.field("extended_plugins", extendedPlugins);
             builder.field("has_native_controller", hasNativeController);
+            builder.field("type", type);
+            if (type == PluginType.BOOTSTRAP) {
+                builder.field("java_opts", javaOpts);
+            }
         }
         builder.endObject();
 
@@ -327,6 +405,13 @@ public class PluginInfo implements Writeable, ToXContentObject {
             .append(prefix).append("Elasticsearch Version: ").append(elasticsearchVersion).append("\n")
             .append(prefix).append("Java Version: ").append(javaVersion).append("\n")
             .append(prefix).append("Native Controller: ").append(hasNativeController).append("\n")
+            .append(prefix).append("Type: ").append(type).append("\n");
+
+        if (type == PluginType.BOOTSTRAP) {
+            information.append(prefix).append("Java Opts: ").append(javaOpts).append("\n");
+        }
+
+        information
             .append(prefix).append("Extended Plugins: ").append(extendedPlugins).append("\n")
             .append(prefix).append(" * Classname: ").append(classname);
         return information.toString();

+ 40 - 0
server/src/main/java/org/elasticsearch/plugins/PluginType.java

@@ -0,0 +1,40 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.plugins;
+
+import java.util.Locale;
+
+/**
+ * Indicates the type of an Elasticsearch plugin.
+ * <p>
+ * Elasticsearch plugins come in two flavours: "isolated", which are kept
+ * separate from the rest of the Elasticsearch code; and "bootstrap", which
+ * take effect when Elasticsearch executes and can modify e.g. JVM
+ * behaviour, but do not otherwise hook into the Elasticsearch lifecycle.
+ */
+public enum PluginType {
+    ISOLATED,
+    BOOTSTRAP;
+
+    @Override
+    public String toString() {
+        return this.name().toLowerCase(Locale.ROOT);
+    }
+}

+ 22 - 13
server/src/main/java/org/elasticsearch/plugins/PluginsService.java

@@ -114,11 +114,12 @@ public class PluginsService implements ReportingService<PluginsAndModules> {
         List<PluginInfo> pluginsList = new ArrayList<>();
         // we need to build a List of plugins for checking mandatory plugins
         final List<String> pluginsNames = new ArrayList<>();
+
         // first we load plugins that are on the classpath. this is for tests
         for (Class<? extends Plugin> pluginClass : classpathPlugins) {
             Plugin plugin = loadPlugin(pluginClass, settings, configPath);
             PluginInfo pluginInfo = new PluginInfo(pluginClass.getName(), "classpath plugin", "NA", Version.CURRENT, "1.8",
-                                                   pluginClass.getName(), Collections.emptyList(), false);
+                                                   pluginClass.getName(), Collections.emptyList(), false, PluginType.ISOLATED, "");
             if (logger.isTraceEnabled()) {
                 logger.trace("plugin loaded from classpath [{}]", pluginInfo);
             }
@@ -349,16 +350,31 @@ public class PluginsService implements ReportingService<PluginsAndModules> {
     private static Set<Bundle> findBundles(final Path directory, String type) throws IOException {
         final Set<Bundle> bundles = new HashSet<>();
         for (final Path plugin : findPluginDirs(directory)) {
-            final Bundle bundle = readPluginBundle(bundles, plugin, type);
-            bundles.add(bundle);
+            final Bundle bundle = readPluginBundle(plugin, type);
+            if (bundle.plugin.getType() == PluginType.BOOTSTRAP) {
+                logger.trace("--- skipping bootstrap plugin [{}] [{}]", type, plugin.toAbsolutePath());
+            } else {
+                if (bundles.add(bundle) == false) {
+                    throw new IllegalStateException("duplicate " + type + ": " + bundle.plugin);
+                }
+                if (type.equals("module") && bundle.plugin.getName().startsWith("test-") && Build.CURRENT.isSnapshot() == false) {
+                    throw new IllegalStateException("external test module [" + plugin.getFileName() + "] found in non-snapshot build");
+                }
+            }
         }
 
+        logger.trace(
+            () -> "findBundles("
+                + type
+                + ") returning: "
+                + bundles.stream().map(b -> b.plugin.getName()).sorted().collect(Collectors.toList())
+        );
+
         return bundles;
     }
 
     // get a bundle for a single plugin dir
-    private static Bundle readPluginBundle(final Set<Bundle> bundles, final Path plugin, String type) throws IOException {
-        LogManager.getLogger(PluginsService.class).trace("--- adding [{}] [{}]", type, plugin.toAbsolutePath());
+    private static Bundle readPluginBundle(final Path plugin, String type) throws IOException {
         final PluginInfo info;
         try {
             info = PluginInfo.readFromProperties(plugin);
@@ -366,14 +382,7 @@ public class PluginsService implements ReportingService<PluginsAndModules> {
             throw new IllegalStateException("Could not load plugin descriptor for " + type +
                                             " directory [" + plugin.getFileName() + "]", e);
         }
-        final Bundle bundle = new Bundle(info, plugin);
-        if (bundles.add(bundle) == false) {
-            throw new IllegalStateException("duplicate " + type + ": " + info);
-        }
-        if (type.equals("module") && info.getName().startsWith("test-") && Build.CURRENT.isSnapshot() == false) {
-            throw new IllegalStateException("external test module [" + plugin.getFileName() + "] found in non-snapshot build");
-        }
-        return bundle;
+        return new Bundle(info, plugin);
     }
 
     /**

+ 5 - 2
server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java

@@ -39,6 +39,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.monitor.os.OsInfo;
 import org.elasticsearch.monitor.process.ProcessInfo;
 import org.elasticsearch.plugins.PluginInfo;
+import org.elasticsearch.plugins.PluginType;
 import org.elasticsearch.search.aggregations.support.AggregationInfo;
 import org.elasticsearch.search.aggregations.support.AggregationUsageService;
 import org.elasticsearch.test.ESTestCase;
@@ -148,14 +149,16 @@ public class NodeInfoStreamingTests extends ESTestCase {
             for (int i = 0; i < numPlugins; i++) {
                 plugins.add(new PluginInfo(randomAlphaOfLengthBetween(3, 10), randomAlphaOfLengthBetween(3, 10),
                     randomAlphaOfLengthBetween(3, 10), VersionUtils.randomVersion(random()), "1.8",
-                    randomAlphaOfLengthBetween(3, 10), Collections.emptyList(), randomBoolean()));
+                    randomAlphaOfLengthBetween(3, 10), Collections.emptyList(), randomBoolean(), randomFrom(PluginType.values()),
+                    randomAlphaOfLengthBetween(3, 10)));
             }
             int numModules = randomIntBetween(0, 5);
             List<PluginInfo> modules = new ArrayList<>();
             for (int i = 0; i < numModules; i++) {
                 modules.add(new PluginInfo(randomAlphaOfLengthBetween(3, 10), randomAlphaOfLengthBetween(3, 10),
                     randomAlphaOfLengthBetween(3, 10), VersionUtils.randomVersion(random()), "1.8",
-                    randomAlphaOfLengthBetween(3, 10), Collections.emptyList(), randomBoolean()));
+                    randomAlphaOfLengthBetween(3, 10), Collections.emptyList(), randomBoolean(), randomFrom(PluginType.values()),
+                    randomAlphaOfLengthBetween(3, 10)));
             }
             pluginsAndModules = new PluginsAndModules(plugins, modules);
         }

+ 82 - 7
server/src/test/java/org/elasticsearch/plugins/PluginInfoTests.java

@@ -185,7 +185,7 @@ public class PluginInfoTests extends ESTestCase {
 
     public void testSerialize() throws Exception {
         PluginInfo info = new PluginInfo("c", "foo", "dummy", Version.CURRENT, "1.8", "dummyclass",
-                                         Collections.singletonList("foo"), randomBoolean());
+                                         Collections.singletonList("foo"), randomBoolean(), PluginType.ISOLATED, "-Dfoo=bar");
         BytesStreamOutput output = new BytesStreamOutput();
         info.writeTo(output);
         ByteBuffer buffer = ByteBuffer.wrap(output.bytes().toBytesRef().bytes);
@@ -198,15 +198,15 @@ public class PluginInfoTests extends ESTestCase {
     public void testPluginListSorted() {
         List<PluginInfo> plugins = new ArrayList<>();
         plugins.add(new PluginInfo("c", "foo", "dummy", Version.CURRENT, "1.8", "dummyclass",
-            Collections.emptyList(), randomBoolean()));
+            Collections.emptyList(), randomBoolean(), PluginType.ISOLATED, "-Da"));
         plugins.add(new PluginInfo("b", "foo", "dummy", Version.CURRENT, "1.8", "dummyclass",
-            Collections.emptyList(), randomBoolean()));
+            Collections.emptyList(), randomBoolean(), PluginType.BOOTSTRAP, "-Db"));
         plugins.add(new PluginInfo( "e", "foo", "dummy", Version.CURRENT, "1.8", "dummyclass",
-            Collections.emptyList(), randomBoolean()));
+            Collections.emptyList(), randomBoolean(), PluginType.ISOLATED, "-Dc"));
         plugins.add(new PluginInfo("a", "foo", "dummy", Version.CURRENT, "1.8", "dummyclass",
-            Collections.emptyList(), randomBoolean()));
+            Collections.emptyList(), randomBoolean(), PluginType.BOOTSTRAP, "-Dd"));
         plugins.add(new PluginInfo("d", "foo", "dummy", Version.CURRENT, "1.8", "dummyclass",
-            Collections.emptyList(), randomBoolean()));
+            Collections.emptyList(), randomBoolean(), PluginType.ISOLATED, "-De"));
         PluginsAndModules pluginsInfo = new PluginsAndModules(plugins, Collections.emptyList());
 
         final List<PluginInfo> infos = pluginsInfo.getPluginInfos();
@@ -226,7 +226,82 @@ public class PluginInfoTests extends ESTestCase {
             "elasticsearch.version", Version.CURRENT.toString(),
             "java.version", System.getProperty("java.specification.version"));
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> PluginInfo.readFromProperties(pluginDir));
-        assertThat(e.getMessage(), containsString("Unknown properties in plugin descriptor"));
+        assertThat(e.getMessage(), containsString("Unknown properties for plugin [my_plugin] in plugin descriptor"));
     }
 
+    public void testMissingType() throws Exception {
+        Path pluginDir = createTempDir().resolve("fake-plugin");
+        PluginTestUtil.writePluginProperties(pluginDir,
+            "description", "fake desc",
+            "classname", "Foo",
+            "name", "my_plugin",
+            "version", "1.0",
+            "elasticsearch.version", Version.CURRENT.toString(),
+            "java.version", System.getProperty("java.specification.version"));
+
+        final PluginInfo pluginInfo = PluginInfo.readFromProperties(pluginDir);
+        assertThat(pluginInfo.getType(), equalTo(PluginType.ISOLATED));
+    }
+
+    public void testInvalidType() throws Exception {
+        Path pluginDir = createTempDir().resolve("fake-plugin");
+        PluginTestUtil.writePluginProperties(pluginDir,
+            "description", "fake desc",
+            "classname", "Foo",
+            "name", "my_plugin",
+            "version", "1.0",
+            "elasticsearch.version", Version.CURRENT.toString(),
+            "java.version", System.getProperty("java.specification.version"),
+            "type", "invalid");
+
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> PluginInfo.readFromProperties(pluginDir));
+        assertThat(e.getMessage(), containsString("[type] must be unspecified or one of [isolated, bootstrap] but found [invalid]"));
+    }
+
+    public void testJavaOptsAreAcceptedWithBootstrapPlugin() throws Exception {
+        Path pluginDir = createTempDir().resolve("fake-plugin");
+        PluginTestUtil.writePluginProperties(pluginDir,
+            "description", "fake desc",
+            "name", "my_plugin",
+            "version", "1.0",
+            "elasticsearch.version", Version.CURRENT.toString(),
+            "java.version", System.getProperty("java.specification.version"),
+            "type", "bootstrap",
+            "java.opts", "-Dfoo=bar");
+
+        final PluginInfo pluginInfo = PluginInfo.readFromProperties(pluginDir);
+        assertThat(pluginInfo.getType(), equalTo(PluginType.BOOTSTRAP));
+        assertThat(pluginInfo.getJavaOpts(), equalTo("-Dfoo=bar"));
+    }
+
+    public void testJavaOptsAreRejectedWithNonBootstrapPlugin() throws Exception {
+        Path pluginDir = createTempDir().resolve("fake-plugin");
+        PluginTestUtil.writePluginProperties(pluginDir,
+            "description", "fake desc",
+            "classname", "Foo",
+            "name", "my_plugin",
+            "version", "1.0",
+            "elasticsearch.version", Version.CURRENT.toString(),
+            "java.version", System.getProperty("java.specification.version"),
+            "type", "isolated",
+            "java.opts", "-Dfoo=bar");
+
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> PluginInfo.readFromProperties(pluginDir));
+        assertThat(e.getMessage(), containsString("[java.opts] can only have a value when [type] is set to [bootstrap]"));
+    }
+
+    public void testClassnameIsRejectedWithBootstrapPlugin() throws Exception {
+        Path pluginDir = createTempDir().resolve("fake-plugin");
+        PluginTestUtil.writePluginProperties(pluginDir,
+            "description", "fake desc",
+            "classname", "Foo",
+            "name", "my_plugin",
+            "version", "1.0",
+            "elasticsearch.version", Version.CURRENT.toString(),
+            "java.version", System.getProperty("java.specification.version"),
+            "type", "bootstrap");
+
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> PluginInfo.readFromProperties(pluginDir));
+        assertThat(e.getMessage(), containsString("[classname] can only have a value when [type] is set to [bootstrap]"));
+    }
 }

+ 36 - 27
server/src/test/java/org/elasticsearch/plugins/PluginsServiceTests.java

@@ -309,7 +309,7 @@ public class PluginsServiceTests extends ESTestCase {
     public void testSortBundlesCycleSelfReference() throws Exception {
         Path pluginDir = createTempDir();
         PluginInfo info = new PluginInfo("foo", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("foo"), false);
+            "MyPlugin", Collections.singletonList("foo"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info, pluginDir);
         IllegalStateException e = expectThrows(IllegalStateException.class, () ->
             PluginsService.sortBundles(Collections.singleton(bundle))
@@ -321,16 +321,16 @@ public class PluginsServiceTests extends ESTestCase {
         Path pluginDir = createTempDir();
         Set<PluginsService.Bundle> bundles = new LinkedHashSet<>(); // control iteration order, so we get know the beginning of the cycle
         PluginInfo info = new PluginInfo("foo", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Arrays.asList("bar", "other"), false);
+            "MyPlugin", Arrays.asList("bar", "other"), false, PluginType.ISOLATED, "");
         bundles.add(new PluginsService.Bundle(info, pluginDir));
         PluginInfo info2 = new PluginInfo("bar", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("baz"), false);
+            "MyPlugin", Collections.singletonList("baz"), false, PluginType.ISOLATED, "");
         bundles.add(new PluginsService.Bundle(info2, pluginDir));
         PluginInfo info3 = new PluginInfo("baz", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("foo"), false);
+            "MyPlugin", Collections.singletonList("foo"), false, PluginType.ISOLATED, "");
         bundles.add(new PluginsService.Bundle(info3, pluginDir));
         PluginInfo info4 = new PluginInfo("other", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         bundles.add(new PluginsService.Bundle(info4, pluginDir));
 
         IllegalStateException e = expectThrows(IllegalStateException.class, () -> PluginsService.sortBundles(bundles));
@@ -340,7 +340,7 @@ public class PluginsServiceTests extends ESTestCase {
     public void testSortBundlesSingle() throws Exception {
         Path pluginDir = createTempDir();
         PluginInfo info = new PluginInfo("foo", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info, pluginDir);
         List<PluginsService.Bundle> sortedBundles = PluginsService.sortBundles(Collections.singleton(bundle));
         assertThat(sortedBundles, Matchers.contains(bundle));
@@ -350,15 +350,15 @@ public class PluginsServiceTests extends ESTestCase {
         Path pluginDir = createTempDir();
         Set<PluginsService.Bundle> bundles = new LinkedHashSet<>(); // control iteration order
         PluginInfo info1 = new PluginInfo("foo", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle1 = new PluginsService.Bundle(info1, pluginDir);
         bundles.add(bundle1);
         PluginInfo info2 = new PluginInfo("bar", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle2 = new PluginsService.Bundle(info2, pluginDir);
         bundles.add(bundle2);
         PluginInfo info3 = new PluginInfo("baz", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle3 = new PluginsService.Bundle(info3, pluginDir);
         bundles.add(bundle3);
         List<PluginsService.Bundle> sortedBundles = PluginsService.sortBundles(bundles);
@@ -368,7 +368,7 @@ public class PluginsServiceTests extends ESTestCase {
     public void testSortBundlesMissingDep() throws Exception {
         Path pluginDir = createTempDir();
         PluginInfo info = new PluginInfo("foo", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("dne"), false);
+            "MyPlugin", Collections.singletonList("dne"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info, pluginDir);
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
             PluginsService.sortBundles(Collections.singleton(bundle))
@@ -380,19 +380,19 @@ public class PluginsServiceTests extends ESTestCase {
         Path pluginDir = createTempDir();
         Set<PluginsService.Bundle> bundles = new LinkedHashSet<>(); // control iteration order
         PluginInfo info1 = new PluginInfo("grandparent", "desc", "1.0",Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle1 = new PluginsService.Bundle(info1, pluginDir);
         bundles.add(bundle1);
         PluginInfo info2 = new PluginInfo("foo", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("common"), false);
+            "MyPlugin", Collections.singletonList("common"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle2 = new PluginsService.Bundle(info2, pluginDir);
         bundles.add(bundle2);
         PluginInfo info3 = new PluginInfo("bar", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("common"), false);
+            "MyPlugin", Collections.singletonList("common"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle3 = new PluginsService.Bundle(info3, pluginDir);
         bundles.add(bundle3);
         PluginInfo info4 = new PluginInfo("common", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("grandparent"), false);
+            "MyPlugin", Collections.singletonList("grandparent"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle4 = new PluginsService.Bundle(info4, pluginDir);
         bundles.add(bundle4);
         List<PluginsService.Bundle> sortedBundles = PluginsService.sortBundles(bundles);
@@ -403,11 +403,11 @@ public class PluginsServiceTests extends ESTestCase {
         Path pluginDir = createTempDir();
         Set<PluginsService.Bundle> bundles = new LinkedHashSet<>(); // control iteration order
         PluginInfo info1 = new PluginInfo("dep", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle1 = new PluginsService.Bundle(info1, pluginDir);
         bundles.add(bundle1);
         PluginInfo info2 = new PluginInfo("myplugin", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("dep"), false);
+            "MyPlugin", Collections.singletonList("dep"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle2 = new PluginsService.Bundle(info2, pluginDir);
         bundles.add(bundle2);
         List<PluginsService.Bundle> sortedBundles = PluginsService.sortBundles(bundles);
@@ -466,7 +466,7 @@ public class PluginsServiceTests extends ESTestCase {
         Map<String, Set<URL>> transitiveDeps = new HashMap<>();
         transitiveDeps.put("dep", Collections.singleton(dupJar.toUri().toURL()));
         PluginInfo info1 = new PluginInfo("myplugin", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("dep"), false);
+            "MyPlugin", Collections.singletonList("dep"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info1, pluginDir);
         IllegalStateException e = expectThrows(IllegalStateException.class, () ->
             PluginsService.checkBundleJarHell(JarHell.parseClassPath(), bundle, transitiveDeps));
@@ -485,7 +485,7 @@ public class PluginsServiceTests extends ESTestCase {
         transitiveDeps.put("dep1", Collections.singleton(dupJar.toUri().toURL()));
         transitiveDeps.put("dep2", Collections.singleton(dupJar.toUri().toURL()));
         PluginInfo info1 = new PluginInfo("myplugin", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Arrays.asList("dep1", "dep2"), false);
+            "MyPlugin", Arrays.asList("dep1", "dep2"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info1, pluginDir);
         IllegalStateException e = expectThrows(IllegalStateException.class, () ->
             PluginsService.checkBundleJarHell(JarHell.parseClassPath(), bundle, transitiveDeps));
@@ -502,7 +502,7 @@ public class PluginsServiceTests extends ESTestCase {
         Path pluginJar = pluginDir.resolve("plugin.jar");
         makeJar(pluginJar, Level.class);
         PluginInfo info1 = new PluginInfo("myplugin", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.emptyList(), false);
+            "MyPlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info1, pluginDir);
         IllegalStateException e = expectThrows(IllegalStateException.class, () ->
             PluginsService.checkBundleJarHell(JarHell.parseClassPath(), bundle, new HashMap<>()));
@@ -521,7 +521,7 @@ public class PluginsServiceTests extends ESTestCase {
         Map<String, Set<URL>> transitiveDeps = new HashMap<>();
         transitiveDeps.put("dep", Collections.singleton(depJar.toUri().toURL()));
         PluginInfo info1 = new PluginInfo("myplugin", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Collections.singletonList("dep"), false);
+            "MyPlugin", Collections.singletonList("dep"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info1, pluginDir);
         IllegalStateException e = expectThrows(IllegalStateException.class, () ->
             PluginsService.checkBundleJarHell(JarHell.parseClassPath(), bundle, transitiveDeps));
@@ -544,7 +544,7 @@ public class PluginsServiceTests extends ESTestCase {
         transitiveDeps.put("dep1", Collections.singleton(dep1Jar.toUri().toURL()));
         transitiveDeps.put("dep2", Collections.singleton(dep2Jar.toUri().toURL()));
         PluginInfo info1 = new PluginInfo("myplugin", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Arrays.asList("dep1", "dep2"), false);
+            "MyPlugin", Arrays.asList("dep1", "dep2"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info1, pluginDir);
         IllegalStateException e = expectThrows(IllegalStateException.class, () ->
             PluginsService.checkBundleJarHell(JarHell.parseClassPath(), bundle, transitiveDeps));
@@ -567,7 +567,7 @@ public class PluginsServiceTests extends ESTestCase {
         transitiveDeps.put("dep1", Collections.singleton(dep1Jar.toUri().toURL()));
         transitiveDeps.put("dep2", Collections.singleton(dep2Jar.toUri().toURL()));
         PluginInfo info1 = new PluginInfo("myplugin", "desc", "1.0", Version.CURRENT, "1.8",
-            "MyPlugin", Arrays.asList("dep1", "dep2"), false);
+            "MyPlugin", Arrays.asList("dep1", "dep2"), false, PluginType.ISOLATED, "");
         PluginsService.Bundle bundle = new PluginsService.Bundle(info1, pluginDir);
         PluginsService.checkBundleJarHell(JarHell.parseClassPath(), bundle, transitiveDeps);
         Set<URL> deps = transitiveDeps.get("myplugin");
@@ -616,14 +616,14 @@ public class PluginsServiceTests extends ESTestCase {
 
     public void testIncompatibleElasticsearchVersion() throws Exception {
         PluginInfo info = new PluginInfo("my_plugin", "desc", "1.0", Version.fromId(6000099),
-            "1.8", "FakePlugin", Collections.emptyList(), false);
+            "1.8", "FakePlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> PluginsService.verifyCompatibility(info));
         assertThat(e.getMessage(), containsString("was built for Elasticsearch version 6.0.0"));
     }
 
     public void testIncompatibleJavaVersion() throws Exception {
         PluginInfo info = new PluginInfo("my_plugin", "desc", "1.0", Version.CURRENT,
-            "1000000.0", "FakePlugin", Collections.emptyList(), false);
+            "1000000.0", "FakePlugin", Collections.emptyList(), false, PluginType.ISOLATED, "");
         IllegalStateException e = expectThrows(IllegalStateException.class, () -> PluginsService.verifyCompatibility(info));
         assertThat(e.getMessage(), containsString("my_plugin requires Java"));
     }
@@ -723,7 +723,10 @@ public class PluginsServiceTests extends ESTestCase {
     public void testExtensiblePlugin() {
         TestExtensiblePlugin extensiblePlugin = new TestExtensiblePlugin();
         PluginsService.loadExtensions(List.of(
-            Tuple.tuple(new PluginInfo("extensible", null, null, null, null, null, List.of(), false), extensiblePlugin)
+            Tuple.tuple(
+                new PluginInfo("extensible", null, null, null, null, null, List.of(), false, PluginType.ISOLATED, ""),
+                extensiblePlugin
+            )
         ));
 
         assertThat(extensiblePlugin.extensions, notNullValue());
@@ -732,8 +735,14 @@ public class PluginsServiceTests extends ESTestCase {
         extensiblePlugin = new TestExtensiblePlugin();
         TestPlugin testPlugin = new TestPlugin();
         PluginsService.loadExtensions(List.of(
-            Tuple.tuple(new PluginInfo("extensible", null, null, null, null, null, List.of(), false), extensiblePlugin),
-            Tuple.tuple(new PluginInfo("test", null, null, null, null, null, List.of("extensible"), false), testPlugin)
+            Tuple.tuple(
+                new PluginInfo("extensible", null, null, null, null, null, List.of(), false, PluginType.ISOLATED, ""),
+                extensiblePlugin
+            ),
+            Tuple.tuple(
+                new PluginInfo("test", null, null, null, null, null, List.of("extensible"), false, PluginType.ISOLATED, ""),
+                testPlugin
+            )
         ));
 
         assertThat(extensiblePlugin.extensions, notNullValue());

+ 4 - 2
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java

@@ -47,6 +47,7 @@ import org.elasticsearch.monitor.os.OsInfo;
 import org.elasticsearch.monitor.os.OsStats;
 import org.elasticsearch.monitor.process.ProcessStats;
 import org.elasticsearch.plugins.PluginInfo;
+import org.elasticsearch.plugins.PluginType;
 import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.transport.TransportInfo;
 import org.elasticsearch.xpack.core.XPackFeatureSet;
@@ -258,7 +259,7 @@ public class ClusterStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Cl
         final PluginsAndModules mockPluginsAndModules = mock(PluginsAndModules.class);
         when(mockNodeInfo.getInfo(PluginsAndModules.class)).thenReturn(mockPluginsAndModules);
         final PluginInfo pluginInfo = new PluginInfo("_plugin", "_plugin_desc", "_plugin_version", Version.CURRENT,
-            "1.8", "_plugin_class", Collections.emptyList(), false);
+            "1.8", "_plugin_class", Collections.emptyList(), false, PluginType.ISOLATED, "");
         when(mockPluginsAndModules.getPluginInfos()).thenReturn(singletonList(pluginInfo));
 
         final OsInfo mockOsInfo = mock(OsInfo.class);
@@ -542,7 +543,8 @@ public class ClusterStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Cl
                 + "          \"description\": \"_plugin_desc\","
                 + "          \"classname\": \"_plugin_class\","
                 + "          \"extended_plugins\": [],"
-                + "          \"has_native_controller\": false"
+                + "          \"has_native_controller\": false,"
+                + "          \"type\": \"isolated\""
                 + "        }"
                 + "      ],"
                 + "      \"network_types\": {"

+ 4 - 0
x-pack/qa/smoke-test-plugins-ssl/build.gradle

@@ -73,6 +73,10 @@ testClusters.integTest {
       // Do not attempt to install ingest-attachment in FIPS 140 as it is not supported (it depends on non-FIPS BouncyCastle)
       return
     }
+    if (pluginName == 'quota-aware-fs') {
+      // This plugin has to be configured to work via system properties
+      return
+    }
     plugin pluginProject.path
     pluginsCount += 1
   }

+ 4 - 0
x-pack/qa/smoke-test-plugins/build.gradle

@@ -21,6 +21,10 @@ testClusters.integTest {
       // Do not attempt to install ingest-attachment in FIPS 140 as it is not supported (it depends on non-FIPS BouncyCastle)
       return
     }
+    if (pluginName == 'quota-aware-fs') {
+      // This plugin has to be configured to work via system properties
+      return
+    }
     plugin pluginProject.path
     pluginsCount += 1
   }