Преглед на файлове

Notify systemd when Elasticsearch is ready (#44673)

Today our systemd service defaults to a service type of simple. This
means that systemd assumes Elasticsearch is ready as soon as the
ExecStart (bin/elasticsearch) process is forked off. This means that the
service appears ready long before it actually is, so before it is ready
to receive requests. It also means that services that want to depend on
Elasticsearch being ready to start can not as there is not a reliable
mechanism to determine this. This commit changes the service type to
notify. This requires that Elasticsearch sends a notification message
via libsystemd sd_notify method. This commit does that by using JNA to
invoke this native method. Additionally, we use this integration to also
notify systemd when we are stopping.
Jason Tedor преди 6 години
родител
ревизия
d558d95434

+ 18 - 0
distribution/build.gradle

@@ -68,6 +68,7 @@ task buildOssNoJdkNotice(type: NoticeTask) {
  *****************************************************************************/
 String ossOutputs = 'build/outputs/oss'
 String defaultOutputs = 'build/outputs/default'
+String systemdOutputs = 'build/outputs/systemd'
 String transportOutputs = 'build/outputs/transport-only'
 
 task processOssOutputs(type: Sync) {
@@ -79,6 +80,10 @@ task processDefaultOutputs(type: Sync) {
   from processOssOutputs
 }
 
+task processSystemdOutputs(type: Sync) {
+  into systemdOutputs
+}
+
 // Integ tests work over the rest http layer, so we need a transport included with the integ test zip.
 // All transport modules are included so that they may be randomized for testing
 task processTransportOutputs(type: Sync) {
@@ -110,6 +115,10 @@ task buildDefaultConfig {
   dependsOn processDefaultOutputs
   outputs.dir "${defaultOutputs}/config"
 }
+task buildSystemdModule {
+  dependsOn processSystemdOutputs
+  outputs.dir "${systemdOutputs}/modules"
+}
 task buildTransportModules {
   dependsOn processTransportOutputs
   outputs.dir "${transportOutputs}/modules"
@@ -186,6 +195,10 @@ ext.restTestExpansions = [
 // we create the buildOssModules task above but fill it here so we can do a single
 // loop over modules to also setup cross task dependencies and increment our modules counter
 project.rootProject.subprojects.findAll { it.parent.path == ':modules' }.each { Project module ->
+  if (module.name == 'systemd') {
+    // the systemd module is only included in the package distributions
+    return
+  }
   File licenses = new File(module.projectDir, 'licenses')
   if (licenses.exists()) {
     buildDefaultNotice.licensesDir licenses
@@ -218,6 +231,8 @@ xpack.subprojects.findAll { it.parent == xpack }.each { Project xpackModule ->
   copyLog4jProperties(buildDefaultLog4jConfig, xpackModule)
 }
 
+copyModule(processSystemdOutputs, project(':modules:systemd'))
+
 // make sure we have a clean task since we aren't a java project, but we have tasks that
 // put stuff in the build dir
 task clean(type: Delete) {
@@ -288,6 +303,9 @@ configure(subprojects.findAll { ['archives', 'packages'].contains(it.name) }) {
             exclude "**/platform/${excludePlatform}-x86_64/**"
           }
         }
+        if (project.path.startsWith(':distribution:packages')) {
+          from(project(':distribution').buildSystemdModule)
+        }
       }
     }
 

+ 2 - 0
distribution/packages/src/common/systemd/elasticsearch.service

@@ -5,11 +5,13 @@ Wants=network-online.target
 After=network-online.target
 
 [Service]
+Type=notify
 RuntimeDirectory=elasticsearch
 PrivateTmp=true
 Environment=ES_HOME=/usr/share/elasticsearch
 Environment=ES_PATH_CONF=${path.conf}
 Environment=PID_DIR=/var/run/elasticsearch
+Environment=ES_SD_NOTIFY=true
 EnvironmentFile=-${path.env}
 
 WorkingDirectory=/usr/share/elasticsearch

+ 25 - 0
modules/systemd/build.gradle

@@ -0,0 +1,25 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+esplugin {
+    description 'Integrates Elasticsearch with systemd'
+    classname 'org.elasticsearch.systemd.SystemdPlugin'
+}
+
+integTest.enabled = false

+ 49 - 0
modules/systemd/src/main/java/org/elasticsearch/systemd/Libsystemd.java

@@ -0,0 +1,49 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.systemd;
+
+import com.sun.jna.Native;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Provides access to the native method sd_notify from libsystemd.
+ */
+class Libsystemd {
+
+    static {
+        AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
+            Native.register(Libsystemd.class, "libsystemd.so.0");
+            return null;
+        });
+    }
+
+    /**
+     * Notify systemd of state changes.
+     *
+     * @param unset_environment if non-zero, the NOTIFY_SOCKET environment variable will be unset before returning and further calls to
+     *                          sd_notify will fail
+     * @param state             a new-line separated list of variable assignments; some assignments are understood directly by systemd
+     * @return a negative error code on failure, and positive if status was successfully sent
+     */
+    static native int sd_notify(int unset_environment, String state);
+
+}

+ 90 - 0
modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java

@@ -0,0 +1,90 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.systemd;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.Constants;
+import org.elasticsearch.Assertions;
+import org.elasticsearch.Build;
+import org.elasticsearch.plugins.ClusterPlugin;
+import org.elasticsearch.plugins.Plugin;
+
+public class SystemdPlugin extends Plugin implements ClusterPlugin {
+
+    private static final Logger logger = LogManager.getLogger(SystemdPlugin.class);
+
+    private final boolean enabled;
+
+    final boolean isEnabled() {
+        return enabled;
+    }
+
+    @SuppressWarnings("unused")
+    public SystemdPlugin() {
+        this(true, Constants.LINUX, System.getenv("ES_SD_NOTIFY"));
+    }
+
+    SystemdPlugin(final boolean assertIsPackageDistribution, final boolean isLinux, final String esSDNotify) {
+        if (Assertions.ENABLED && assertIsPackageDistribution) {
+            // our build is configured to only include this module in the package distributions
+            assert Build.CURRENT.type() == Build.Type.DEB || Build.CURRENT.type() == Build.Type.RPM : Build.CURRENT.type();
+        }
+        if (isLinux == false || esSDNotify == null) {
+            enabled = false;
+            return;
+        }
+        if (Boolean.TRUE.toString().equals(esSDNotify) == false && Boolean.FALSE.toString().equals(esSDNotify) == false) {
+            throw new RuntimeException("ES_SD_NOTIFY set to unexpected value [" + esSDNotify + "]");
+        }
+        enabled = Boolean.TRUE.toString().equals(esSDNotify);
+    }
+
+    int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) {
+        return Libsystemd.sd_notify(unset_environment, state);
+    }
+
+    @Override
+    public void onNodeStarted() {
+        if (enabled == false) {
+            return;
+        }
+        final int rc = sd_notify(0, "READY=1");
+        logger.trace("sd_notify returned [{}]", rc);
+        if (rc < 0) {
+            // treat failure to notify systemd of readiness as a startup failure
+            throw new RuntimeException("sd_notify returned error [" + rc + "]");
+        }
+    }
+
+    @Override
+    public void close() {
+        if (enabled == false) {
+            return;
+        }
+        final int rc = sd_notify(0, "STOPPING=1");
+        logger.trace("sd_notify returned [{}]", rc);
+        if (rc < 0) {
+            // do not treat failure to notify systemd of stopping as a failure
+            logger.warn("sd_notify returned error [{}]", rc);
+        }
+    }
+
+}

+ 23 - 0
modules/systemd/src/main/plugin-metadata/plugin-security.policy

@@ -0,0 +1,23 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+grant codeBase "${codebase.systemd}" {
+  // for registering native methods
+  permission java.lang.RuntimePermission "accessDeclaredMembers";
+};

+ 171 - 0
modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java

@@ -0,0 +1,171 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.systemd;
+
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.hamcrest.OptionalMatchers;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasToString;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class SystemdPluginTests extends ESTestCase {
+
+    public void testIsEnabled() {
+        final SystemdPlugin plugin = new SystemdPlugin(false, true, Boolean.TRUE.toString());
+        assertTrue(plugin.isEnabled());
+    }
+
+    public void testIsNotLinux() {
+        final SystemdPlugin plugin = new SystemdPlugin(false, false, Boolean.TRUE.toString());
+        assertFalse(plugin.isEnabled());
+    }
+
+    public void testIsImplicitlyNotEnabled() {
+        final SystemdPlugin plugin = new SystemdPlugin(false, true, null);
+        assertFalse(plugin.isEnabled());
+    }
+
+    public void testIsExplicitlyNotEnabled() {
+        final SystemdPlugin plugin = new SystemdPlugin(false, true, Boolean.FALSE.toString());
+        assertFalse(plugin.isEnabled());
+    }
+
+    public void testInvalid() {
+        final String esSDNotify = randomValueOtherThanMany(
+            s -> Boolean.TRUE.toString().equals(s) || Boolean.FALSE.toString().equals(s),
+            () -> randomAlphaOfLength(4));
+        final RuntimeException e = expectThrows(RuntimeException.class,
+            () -> new SystemdPlugin(false, true, esSDNotify));
+        assertThat(e, hasToString(containsString("ES_SD_NOTIFY set to unexpected value [" + esSDNotify + "]")));
+    }
+
+    public void testOnNodeStartedSuccess() {
+        runTestOnNodeStarted(
+            Boolean.TRUE.toString(),
+            randomIntBetween(0, Integer.MAX_VALUE),
+            maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
+    }
+
+    public void testOnNodeStartedFailure() {
+        final int rc = randomIntBetween(Integer.MIN_VALUE, -1);
+        runTestOnNodeStarted(
+            Boolean.TRUE.toString(),
+            rc,
+            maybe -> {
+                assertThat(maybe, OptionalMatchers.isPresent());
+                // noinspection OptionalGetWithoutIsPresent
+                assertThat(maybe.get(), instanceOf(RuntimeException.class));
+                assertThat(maybe.get(), hasToString(containsString("sd_notify returned error [" + rc + "]")));
+            });
+    }
+
+    public void testOnNodeStartedNotEnabled() {
+        runTestOnNodeStarted(
+            Boolean.FALSE.toString(),
+            randomInt(),
+            maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
+    }
+
+    private void runTestOnNodeStarted(
+        final String esSDNotify,
+        final int rc,
+        final Consumer<Optional<Exception>> assertions) {
+        runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1");
+    }
+
+    public void testCloseSuccess() {
+        runTestClose(
+            Boolean.TRUE.toString(),
+            randomIntBetween(1, Integer.MAX_VALUE),
+            maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
+    }
+
+    public void testCloseFailure() {
+        runTestClose(
+            Boolean.TRUE.toString(),
+            randomIntBetween(Integer.MIN_VALUE, -1),
+            maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
+    }
+
+    public void testCloseNotEnabled() {
+        runTestClose(
+            Boolean.FALSE.toString(),
+            randomInt(),
+            maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
+    }
+
+    private void runTestClose(
+        final String esSDNotify,
+        final int rc,
+        final Consumer<Optional<Exception>> assertions) {
+        runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1");
+    }
+
+    private void runTest(
+        final String esSDNotify,
+        final int rc,
+        final Consumer<Optional<Exception>> assertions,
+        final CheckedConsumer<SystemdPlugin, IOException> invocation,
+        final String expectedState) {
+        final AtomicBoolean invoked = new AtomicBoolean();
+        final AtomicInteger invokedUnsetEnvironment = new AtomicInteger();
+        final AtomicReference<String> invokedState = new AtomicReference<>();
+        final SystemdPlugin plugin = new SystemdPlugin(false, true, esSDNotify) {
+
+            @Override
+            int sd_notify(final int unset_environment, final String state) {
+                invoked.set(true);
+                invokedUnsetEnvironment.set(unset_environment);
+                invokedState.set(state);
+                return rc;
+            }
+
+        };
+
+        boolean success = false;
+        try {
+            invocation.accept(plugin);
+            success = true;
+        } catch (final Exception e) {
+            assertions.accept(Optional.of(e));
+        }
+        if (success) {
+            assertions.accept(Optional.empty());
+        }
+        if (Boolean.TRUE.toString().equals(esSDNotify)) {
+            assertTrue(invoked.get());
+            assertThat(invokedUnsetEnvironment.get(), equalTo(0));
+            assertThat(invokedState.get(), equalTo(expectedState));
+        } else {
+            assertFalse(invoked.get());
+        }
+    }
+
+}

+ 6 - 0
server/src/main/resources/org/elasticsearch/bootstrap/security.policy

@@ -52,6 +52,11 @@ grant codeBase "${codebase.elasticsearch-plugin-classloader}" {
   permission java.lang.RuntimePermission "createClassLoader";
 };
 
+grant codeBase "${codebase.jna}" {
+  // for registering native methods
+  permission java.lang.RuntimePermission "accessDeclaredMembers";
+};
+
 //// Everything else:
 
 grant {
@@ -143,4 +148,5 @@ grant {
   permission java.io.FilePermission "/sys/fs/cgroup/cpuacct/-", "read";
   permission java.io.FilePermission "/sys/fs/cgroup/memory", "read";
   permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read";
+
 };