Browse Source

Consistent Secure Settings (#40416)

Introduces a new `ConsistentSecureSettingsValidatorService` service that exposes
a single public method, namely `allSecureSettingsConsistent`. The method returns
`true` if the local node's secure settings (inside the keystore) are equal to the
master's, and `false` otherwise. Technically, the local node has to have exactly
the same secure settings - setting names should not be missing or in surplus
- for all `SecureSetting` instances that are flagged with the newly introduced
`Property.Consistent`. It is worth highlighting that the `allSecureSettingsConsistent`
is not a consensus view across the cluster, but rather the local node's perspective
in relation to the master.
Albert Zaharovits 6 years ago
parent
commit
cdfc98680f
21 changed files with 879 additions and 41 deletions
  1. 2 2
      build.gradle
  2. 23 0
      distribution/tools/keystore-cli/src/test/java/org/elasticsearch/common/settings/KeyStoreWrapperTests.java
  3. 4 0
      server/src/main/java/org/elasticsearch/cluster/metadata/DiffableStringMap.java
  4. 48 3
      server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
  5. 1 1
      server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
  6. 14 5
      server/src/main/java/org/elasticsearch/common/hash/MessageDigests.java
  7. 2 2
      server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  8. 256 0
      server/src/main/java/org/elasticsearch/common/settings/ConsistentSettingsService.java
  9. 42 19
      server/src/main/java/org/elasticsearch/common/settings/KeyStoreWrapper.java
  10. 18 1
      server/src/main/java/org/elasticsearch/common/settings/SecureSetting.java
  11. 2 0
      server/src/main/java/org/elasticsearch/common/settings/SecureSettings.java
  12. 20 0
      server/src/main/java/org/elasticsearch/common/settings/Setting.java
  13. 7 2
      server/src/main/java/org/elasticsearch/common/settings/Settings.java
  14. 21 1
      server/src/main/java/org/elasticsearch/common/settings/SettingsModule.java
  15. 4 0
      server/src/main/java/org/elasticsearch/node/Node.java
  16. 188 0
      server/src/test/java/org/elasticsearch/common/settings/ConsistentSettingsIT.java
  17. 159 0
      server/src/test/java/org/elasticsearch/common/settings/ConsistentSettingsServiceTests.java
  18. 36 0
      server/src/test/java/org/elasticsearch/common/settings/SettingsModuleTests.java
  19. 13 0
      test/framework/src/main/java/org/elasticsearch/common/settings/MockSecureSettings.java
  20. 12 5
      x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/NotificationService.java
  21. 7 0
      x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/notification/NotificationServiceTests.java

+ 2 - 2
build.gradle

@@ -160,8 +160,8 @@ task verifyVersions {
  * after the backport of the backcompat code is complete.
  */
 
-boolean bwc_tests_enabled = true
-final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
+boolean bwc_tests_enabled = false
+final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/40416" /* place a PR link here when committing bwc changes */
 if (bwc_tests_enabled == false) {
   if (bwc_tests_disabled_issue.isEmpty()) {
     throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

+ 23 - 0
distribution/tools/keystore-cli/src/test/java/org/elasticsearch/common/settings/KeyStoreWrapperTests.java

@@ -51,10 +51,12 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
+import java.security.MessageDigest;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
+import java.util.Locale;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.containsString;
@@ -126,6 +128,27 @@ public class KeyStoreWrapperTests extends ESTestCase {
         assertThat(exception.getMessage(), containsString("closed"));
     }
 
+    public void testValueSHA256Digest() throws Exception {
+        final KeyStoreWrapper keystore = KeyStoreWrapper.create();
+        final String stringSettingKeyName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT) + "1";
+        final String stringSettingValue = randomAlphaOfLength(32);
+        keystore.setString(stringSettingKeyName, stringSettingValue.toCharArray());
+        final String fileSettingKeyName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT) + "2";
+        final byte[] fileSettingValue = randomByteArrayOfLength(32);
+        keystore.setFile(fileSettingKeyName, fileSettingValue);
+
+        final byte[] stringSettingHash = MessageDigest.getInstance("SHA-256").digest(stringSettingValue.getBytes(StandardCharsets.UTF_8));
+        assertThat(keystore.getSHA256Digest(stringSettingKeyName), equalTo(stringSettingHash));
+        final byte[] fileSettingHash = MessageDigest.getInstance("SHA-256").digest(fileSettingValue);
+        assertThat(keystore.getSHA256Digest(fileSettingKeyName), equalTo(fileSettingHash));
+
+        keystore.close();
+
+        // value hashes accessible even when the keystore is closed
+        assertThat(keystore.getSHA256Digest(stringSettingKeyName), equalTo(stringSettingHash));
+        assertThat(keystore.getSHA256Digest(fileSettingKeyName), equalTo(fileSettingHash));
+    }
+
     public void testUpgradeNoop() throws Exception {
         KeyStoreWrapper keystore = KeyStoreWrapper.create();
         SecureString seed = keystore.getString(KeyStoreWrapper.SEED_SETTING.getKey());

+ 4 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DiffableStringMap.java

@@ -39,6 +39,8 @@ import java.util.Set;
  */
 public class DiffableStringMap extends AbstractMap<String, String> implements Diffable<DiffableStringMap> {
 
+    public static final DiffableStringMap EMPTY = new DiffableStringMap(Collections.emptyMap());
+
     private final Map<String, String> innerMap;
 
     DiffableStringMap(final Map<String, String> map) {
@@ -75,6 +77,8 @@ public class DiffableStringMap extends AbstractMap<String, String> implements Di
      */
     public static class DiffableStringMapDiff implements Diff<DiffableStringMap> {
 
+        public static final DiffableStringMapDiff EMPTY = new DiffableStringMapDiff(DiffableStringMap.EMPTY, DiffableStringMap.EMPTY);
+
         private final List<String> deletes;
         private final Map<String, String> upserts; // diffs also become upserts
 

+ 48 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

@@ -25,6 +25,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.CollectionUtil;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.AliasesRequest;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState.FeatureAware;
@@ -168,6 +169,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
     private final Settings transientSettings;
     private final Settings persistentSettings;
     private final Settings settings;
+    private final DiffableStringMap hashesOfConsistentSettings;
     private final ImmutableOpenMap<String, IndexMetaData> indices;
     private final ImmutableOpenMap<String, IndexTemplateMetaData> templates;
     private final ImmutableOpenMap<String, Custom> customs;
@@ -182,7 +184,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
     private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;
 
     MetaData(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetaData coordinationMetaData,
-             Settings transientSettings, Settings persistentSettings,
+             Settings transientSettings, Settings persistentSettings, DiffableStringMap hashesOfConsistentSettings,
              ImmutableOpenMap<String, IndexMetaData> indices, ImmutableOpenMap<String, IndexTemplateMetaData> templates,
              ImmutableOpenMap<String, Custom> customs, String[] allIndices, String[] allOpenIndices, String[] allClosedIndices,
              SortedMap<String, AliasOrIndex> aliasAndIndexLookup) {
@@ -193,6 +195,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         this.transientSettings = transientSettings;
         this.persistentSettings = persistentSettings;
         this.settings = Settings.builder().put(persistentSettings).put(transientSettings).build();
+        this.hashesOfConsistentSettings = hashesOfConsistentSettings;
         this.indices = indices;
         this.customs = customs;
         this.templates = templates;
@@ -244,6 +247,10 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         return this.persistentSettings;
     }
 
+    public Map<String, String> hashesOfConsistentSettings() {
+        return this.hashesOfConsistentSettings;
+    }
+
     public CoordinationMetaData coordinationMetaData() {
         return this.coordinationMetaData;
     }
@@ -733,6 +740,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         if (!metaData1.persistentSettings.equals(metaData2.persistentSettings)) {
             return false;
         }
+        if (!metaData1.hashesOfConsistentSettings.equals(metaData2.hashesOfConsistentSettings)) {
+            return false;
+        }
         if (!metaData1.templates.equals(metaData2.templates())) {
             return false;
         }
@@ -787,6 +797,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         private CoordinationMetaData coordinationMetaData;
         private Settings transientSettings;
         private Settings persistentSettings;
+        private Diff<DiffableStringMap> hashesOfConsistentSettings;
         private Diff<ImmutableOpenMap<String, IndexMetaData>> indices;
         private Diff<ImmutableOpenMap<String, IndexTemplateMetaData>> templates;
         private Diff<ImmutableOpenMap<String, Custom>> customs;
@@ -798,6 +809,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
             coordinationMetaData = after.coordinationMetaData;
             transientSettings = after.transientSettings;
             persistentSettings = after.persistentSettings;
+            hashesOfConsistentSettings = after.hashesOfConsistentSettings.diff(before.hashesOfConsistentSettings);
             indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer());
             templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer());
             customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
@@ -810,6 +822,11 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
             coordinationMetaData = new CoordinationMetaData(in);
             transientSettings = Settings.readSettingsFromStream(in);
             persistentSettings = Settings.readSettingsFromStream(in);
+            if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
+                hashesOfConsistentSettings = DiffableStringMap.readDiffFrom(in);
+            } else {
+                hashesOfConsistentSettings = DiffableStringMap.DiffableStringMapDiff.EMPTY;
+            }
             indices = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexMetaData::readFrom,
                 IndexMetaData::readDiffFrom);
             templates = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexTemplateMetaData::readFrom,
@@ -825,6 +842,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
             coordinationMetaData.writeTo(out);
             Settings.writeSettingsToStream(transientSettings, out);
             Settings.writeSettingsToStream(persistentSettings, out);
+            if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
+                hashesOfConsistentSettings.writeTo(out);
+            }
             indices.writeTo(out);
             templates.writeTo(out);
             customs.writeTo(out);
@@ -839,6 +859,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
             builder.coordinationMetaData(coordinationMetaData);
             builder.transientSettings(transientSettings);
             builder.persistentSettings(persistentSettings);
+            builder.hashesOfConsistentSettings(hashesOfConsistentSettings.apply(part.hashesOfConsistentSettings));
             builder.indices(indices.apply(part.indices));
             builder.templates(templates.apply(part.templates));
             builder.customs(customs.apply(part.customs));
@@ -854,6 +875,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         builder.coordinationMetaData(new CoordinationMetaData(in));
         builder.transientSettings(readSettingsFromStream(in));
         builder.persistentSettings(readSettingsFromStream(in));
+        if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
+            builder.hashesOfConsistentSettings(new DiffableStringMap(in));
+        }
         int size = in.readVInt();
         for (int i = 0; i < size; i++) {
             builder.put(IndexMetaData.readFrom(in), false);
@@ -878,6 +902,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         coordinationMetaData.writeTo(out);
         writeSettingsToStream(transientSettings, out);
         writeSettingsToStream(persistentSettings, out);
+        if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
+            hashesOfConsistentSettings.writeTo(out);
+        }
         out.writeVInt(indices.size());
         for (IndexMetaData indexMetaData : this) {
             indexMetaData.writeTo(out);
@@ -918,6 +945,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         private CoordinationMetaData coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
         private Settings transientSettings = Settings.Builder.EMPTY_SETTINGS;
         private Settings persistentSettings = Settings.Builder.EMPTY_SETTINGS;
+        private DiffableStringMap hashesOfConsistentSettings = new DiffableStringMap(Collections.emptyMap());
 
         private final ImmutableOpenMap.Builder<String, IndexMetaData> indices;
         private final ImmutableOpenMap.Builder<String, IndexTemplateMetaData> templates;
@@ -937,6 +965,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
             this.coordinationMetaData = metaData.coordinationMetaData;
             this.transientSettings = metaData.transientSettings;
             this.persistentSettings = metaData.persistentSettings;
+            this.hashesOfConsistentSettings = metaData.hashesOfConsistentSettings;
             this.version = metaData.version;
             this.indices = ImmutableOpenMap.builder(metaData.indices);
             this.templates = ImmutableOpenMap.builder(metaData.templates);
@@ -1100,6 +1129,20 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
             return this;
         }
 
+        public DiffableStringMap hashesOfConsistentSettings() {
+            return this.hashesOfConsistentSettings;
+        }
+
+        public Builder hashesOfConsistentSettings(DiffableStringMap hashesOfConsistentSettings) {
+            this.hashesOfConsistentSettings = hashesOfConsistentSettings;
+            return this;
+        }
+
+        public Builder hashesOfConsistentSettings(Map<String, String> hashesOfConsistentSettings) {
+            this.hashesOfConsistentSettings = new DiffableStringMap(hashesOfConsistentSettings);
+            return this;
+        }
+
         public Builder version(long version) {
             this.version = version;
             return this;
@@ -1173,8 +1216,8 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
             String[] allClosedIndicesArray = allClosedIndices.toArray(new String[allClosedIndices.size()]);
 
             return new MetaData(clusterUUID, clusterUUIDCommitted, version, coordinationMetaData, transientSettings, persistentSettings,
-                    indices.build(), templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray,
-                    aliasAndIndexLookup);
+                    hashesOfConsistentSettings, indices.build(), templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray,
+                    allClosedIndicesArray, aliasAndIndexLookup);
         }
 
         private SortedMap<String, AliasOrIndex> buildAliasAndIndexLookup() {
@@ -1298,6 +1341,8 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
                         while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                             builder.put(IndexMetaData.Builder.fromXContent(parser), false);
                         }
+                    } else if ("hashes_of_consistent_settings".equals(currentFieldName)) {
+                        builder.hashesOfConsistentSettings(parser.mapStrings());
                     } else if ("templates".equals(currentFieldName)) {
                         while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                             builder.put(IndexTemplateMetaData.Builder.fromXContent(parser, parser.currentName()));

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

@@ -73,7 +73,7 @@ public class ClusterService extends AbstractLifecycleComponent {
     }
 
     public ClusterService(Settings settings, ClusterSettings clusterSettings, MasterService masterService,
-        ClusterApplierService clusterApplierService) {
+                          ClusterApplierService clusterApplierService) {
         this.settings = settings;
         this.nodeName = Node.NODE_NAME_SETTING.get(settings);
         this.masterService = masterService;

+ 14 - 5
server/src/main/java/org/elasticsearch/common/hash/MessageDigests.java

@@ -95,15 +95,24 @@ public final class MessageDigests {
      * @return a hex representation of the input as a String.
      */
     public static String toHexString(byte[] bytes) {
-        Objects.requireNonNull(bytes);
-        StringBuilder sb = new StringBuilder(2 * bytes.length);
+        return new String(toHexCharArray(bytes));
+    }
 
+    /**
+     * Encodes the byte array into a newly created hex char array, without allocating any other temporary variables.
+     *
+     * @param bytes the input to be encoded as hex.
+     * @return the hex encoding of the input as a char array.
+     */
+    public static char[] toHexCharArray(byte[] bytes) {
+        Objects.requireNonNull(bytes);
+        final char[] result = new char[2 * bytes.length];
         for (int i = 0; i < bytes.length; i++) {
             byte b = bytes[i];
-            sb.append(HEX_DIGITS[b >> 4 & 0xf]).append(HEX_DIGITS[b & 0xf]);
+            result[2 * i] = HEX_DIGITS[b >> 4 & 0xf];
+            result[2 * i + 1] = HEX_DIGITS[b & 0xf];
         }
-
-        return sb.toString();
+        return result;
     }
 
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -112,12 +112,12 @@ import java.util.function.Predicate;
  * Encapsulates all valid cluster level settings.
  */
 public final class ClusterSettings extends AbstractScopedSettings {
+
     public ClusterSettings(final Settings nodeSettings, final Set<Setting<?>> settingsSet) {
         this(nodeSettings, settingsSet, Collections.emptySet());
     }
 
-    public ClusterSettings(
-            final Settings nodeSettings, final Set<Setting<?>> settingsSet, final Set<SettingUpgrader<?>> settingUpgraders) {
+    public ClusterSettings(final Settings nodeSettings, final Set<Setting<?>> settingsSet, final Set<SettingUpgrader<?>> settingUpgraders) {
         super(nodeSettings, settingsSet, settingUpgraders, Property.NodeScope);
         addSettingsUpdater(new LoggingSettingUpdater(nodeSettings));
     }

+ 256 - 0
server/src/main/java/org/elasticsearch/common/settings/ConsistentSettingsService.java

@@ -0,0 +1,256 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.settings;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.LocalNodeMasterListener;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.hash.MessageDigests;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.nio.charset.StandardCharsets;
+import java.security.NoSuchAlgorithmException;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import javax.crypto.SecretKey;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+
+/**
+ * Used to publish secure setting hashes in the cluster state and to validate those hashes against the local values of those same settings.
+ * This is colloquially referred to as the secure setting consistency check. It will publish and verify hashes only for the collection
+ * of settings passed in the constructor. The settings have to have the {@link Setting.Property#Consistent} property. 
+ */
+public final class ConsistentSettingsService {
+    private static final Logger logger = LogManager.getLogger(ConsistentSettingsService.class);
+
+    private final Settings settings;
+    private final ClusterService clusterService;
+    private final Collection<Setting<?>> secureSettingsCollection;
+    private final SecretKeyFactory pbkdf2KeyFactory;
+
+    public ConsistentSettingsService(Settings settings, ClusterService clusterService,
+                                     Collection<Setting<?>> secureSettingsCollection) {
+        this.settings = settings;
+        this.clusterService = clusterService;
+        this.secureSettingsCollection = secureSettingsCollection;
+        // this is used to compute the PBKDF2 hash (the published one)
+        try {
+            this.pbkdf2KeyFactory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("The \"PBKDF2WithHmacSHA512\" algorithm is required for consistent secure settings' hashes", e);
+        }
+    }
+
+    /**
+     * Returns a {@link LocalNodeMasterListener} that will publish hashes of all the settings passed in the constructor. These hashes are
+     * published by the master node only. Note that this is not designed for {@link SecureSettings} implementations that are mutable.
+     */
+    public LocalNodeMasterListener newHashPublisher() {
+        // eagerly compute hashes to be published
+        final Map<String, String> computedHashesOfConsistentSettings = computeHashesOfConsistentSecureSettings();
+        return new HashesPublisher(computedHashesOfConsistentSettings, clusterService);
+    }
+
+    /**
+     * Verifies that the hashes of consistent secure settings in the latest {@code ClusterState} verify for the values of those same
+     * settings on the local node. The settings to be checked are passed in the constructor. Also, validates that a missing local
+     * value is also missing in the published set, and vice-versa.  
+     */
+    public boolean areAllConsistent() {
+        final ClusterState state = clusterService.state();
+        final Map<String, String> publishedHashesOfConsistentSettings = state.metaData().hashesOfConsistentSettings();
+        final Set<String> publishedSettingKeysToVerify = new HashSet<>();
+        publishedSettingKeysToVerify.addAll(publishedHashesOfConsistentSettings.keySet());
+        final AtomicBoolean allConsistent = new AtomicBoolean(true);
+        forEachConcreteSecureSettingDo(concreteSecureSetting -> {
+            final String publishedSaltAndHash = publishedHashesOfConsistentSettings.get(concreteSecureSetting.getKey());
+            final byte[] localHash = concreteSecureSetting.getSecretDigest(settings);
+            if (publishedSaltAndHash == null && localHash == null) {
+                // consistency of missing
+                logger.debug("no published hash for the consistent secure setting [{}] but it also does NOT exist on the local node",
+                        concreteSecureSetting.getKey());
+            } else if (publishedSaltAndHash == null && localHash != null) {
+                // setting missing on master but present locally
+                logger.warn("no published hash for the consistent secure setting [{}] but it exists on the local node",
+                        concreteSecureSetting.getKey());
+                if (state.nodes().isLocalNodeElectedMaster()) {
+                    throw new IllegalStateException("Master node cannot validate consistent setting. No published hash for ["
+                            + concreteSecureSetting.getKey() + "] but setting exists.");
+                }
+                allConsistent.set(false);
+            } else if (publishedSaltAndHash != null && localHash == null) {
+                // setting missing locally but present on master
+                logger.warn("the consistent secure setting [{}] does not exist on the local node but there is a published hash for it",
+                        concreteSecureSetting.getKey());
+                allConsistent.set(false);
+            } else {
+                assert publishedSaltAndHash != null;
+                assert localHash != null;
+                final String[] parts = publishedSaltAndHash.split(":");
+                if (parts == null || parts.length != 2) {
+                    throw new IllegalArgumentException("published hash [" + publishedSaltAndHash + " ] for secure setting ["
+                            + concreteSecureSetting.getKey() + "] is invalid");
+                }
+                final String publishedSalt = parts[0];
+                final String publishedHash = parts[1];
+                final byte[] computedSaltedHashBytes = computeSaltedPBKDF2Hash(localHash, publishedSalt.getBytes(StandardCharsets.UTF_8));
+                final String computedSaltedHash = new String(Base64.getEncoder().encode(computedSaltedHashBytes), StandardCharsets.UTF_8);
+                if (false == publishedHash.equals(computedSaltedHash)) {
+                    logger.warn("the published hash [{}] of the consistent secure setting [{}] differs from the locally computed one [{}]",
+                            publishedHash, concreteSecureSetting.getKey(), computedSaltedHash);
+                    if (state.nodes().isLocalNodeElectedMaster()) {
+                        throw new IllegalStateException("Master node cannot validate consistent setting. The published hash ["
+                                + publishedHash + "] of the consistent secure setting [" + concreteSecureSetting.getKey()
+                                + "] differs from the locally computed one [" + computedSaltedHash + "].");
+                    }
+                    allConsistent.set(false);
+                }
+            }
+            publishedSettingKeysToVerify.remove(concreteSecureSetting.getKey());
+        });
+        // another case of settings missing locally, when group settings have not expanded to all the keys published
+        for (String publishedSettingKey : publishedSettingKeysToVerify) {
+            for (Setting<?> setting : secureSettingsCollection) {
+                if (setting.match(publishedSettingKey)) {
+                    // setting missing locally but present on master
+                    logger.warn("the consistent secure setting [{}] does not exist on the local node but there is a published hash for it",
+                            publishedSettingKey);
+                    allConsistent.set(false);
+                }
+            }
+        }
+        return allConsistent.get();
+    }
+
+    /**
+     * Iterate over the passed in secure settings, expanding {@link Setting.AffixSetting} to concrete settings, in the scope of the local
+     * settings.
+     */
+    private void forEachConcreteSecureSettingDo(Consumer<SecureSetting<?>> secureSettingConsumer) {
+        for (Setting<?> setting : secureSettingsCollection) {
+            assert setting.isConsistent() : "[" + setting.getKey() + "] is not a consistent setting";
+            if (setting instanceof Setting.AffixSetting<?>) {
+                ((Setting.AffixSetting<?>)setting).getAllConcreteSettings(settings).forEach(concreteSetting -> {
+                    assert concreteSetting instanceof SecureSetting<?> : "[" + concreteSetting.getKey() + "] is not a secure setting";
+                    secureSettingConsumer.accept((SecureSetting<?>)concreteSetting);
+                });
+            } else if (setting instanceof SecureSetting<?>) {
+                secureSettingConsumer.accept((SecureSetting<?>) setting);
+            } else {
+                assert false : "Unrecognized consistent secure setting [" + setting.getKey() + "]";
+            }
+        }
+    }
+
+    private Map<String, String> computeHashesOfConsistentSecureSettings() {
+        final Map<String, String> hashesBySettingKey = new HashMap<>();
+        forEachConcreteSecureSettingDo(concreteSecureSetting -> {
+            final byte[] localHash = concreteSecureSetting.getSecretDigest(settings);
+            if (localHash != null) {
+                final String salt = UUIDs.randomBase64UUID();
+                final byte[] publicHash = computeSaltedPBKDF2Hash(localHash, salt.getBytes(StandardCharsets.UTF_8));
+                final String encodedPublicHash = new String(Base64.getEncoder().encode(publicHash), StandardCharsets.UTF_8);
+                hashesBySettingKey.put(concreteSecureSetting.getKey(), salt + ":" + encodedPublicHash);
+            }
+        });
+        return hashesBySettingKey;
+    }
+
+    private byte[] computeSaltedPBKDF2Hash(byte[] bytes, byte[] salt) {
+        final int iterations = 5000;
+        final int keyLength = 512;
+        char[] value = null;
+        try {
+            value = MessageDigests.toHexCharArray(bytes);
+            final PBEKeySpec spec = new PBEKeySpec(value, salt, iterations, keyLength);
+            final SecretKey key = pbkdf2KeyFactory.generateSecret(spec);
+            return key.getEncoded();
+        } catch (InvalidKeySpecException e) {
+            throw new RuntimeException("Unexpected exception when computing PBKDF2 hash", e);
+        } finally {
+            if (value != null) {
+                Arrays.fill(value, '0');
+            }
+        }
+    }
+
+    static final class HashesPublisher implements LocalNodeMasterListener {
+
+        // eagerly compute hashes to be published
+        final Map<String, String> computedHashesOfConsistentSettings;
+        final ClusterService clusterService;
+
+        HashesPublisher(Map<String, String> computedHashesOfConsistentSettings, ClusterService clusterService) {
+            this.computedHashesOfConsistentSettings = Map.copyOf(computedHashesOfConsistentSettings);
+            this.clusterService = clusterService;
+        }
+
+        @Override
+        public void onMaster() {
+            clusterService.submitStateUpdateTask("publish-secure-settings-hashes", new ClusterStateUpdateTask(Priority.URGENT) {
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    final Map<String, String> publishedHashesOfConsistentSettings = currentState.metaData()
+                            .hashesOfConsistentSettings();
+                    if (computedHashesOfConsistentSettings.equals(publishedHashesOfConsistentSettings)) {
+                        logger.debug("Nothing to publish. What is already published matches this node's view.");
+                        return currentState;
+                    } else {
+                        return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData())
+                                .hashesOfConsistentSettings(computedHashesOfConsistentSettings)).build();
+                    }
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    logger.error("unable to publish secure settings hashes", e);
+                }
+
+            });
+        }
+
+        @Override
+        public void offMaster() {
+            logger.trace("I am no longer master, nothing to do");
+        }
+
+        @Override
+        public String executorName() {
+            return ThreadPool.Names.SAME;
+        }
+    }
+
+}

+ 42 - 19
server/src/main/java/org/elasticsearch/common/settings/KeyStoreWrapper.java

@@ -30,6 +30,7 @@ import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.cli.ExitCodes;
 import org.elasticsearch.cli.UserException;
 import org.elasticsearch.common.Randomness;
+import org.elasticsearch.common.hash.MessageDigests;
 
 import javax.crypto.Cipher;
 import javax.crypto.CipherInputStream;
@@ -85,6 +86,17 @@ public class KeyStoreWrapper implements SecureSettings {
         FILE
     }
 
+    /** An entry in the keystore. The bytes are opaque and interpreted based on the entry type. */
+    private static class Entry {
+        final byte[] bytes;
+        final byte[] sha256Digest;
+
+        Entry(byte[] bytes) {
+            this.bytes = bytes;
+            this.sha256Digest = MessageDigests.sha256().digest(bytes);
+        }
+    }
+
     /**
      * A regex for the valid characters that a setting name in the keystore may use.
      */
@@ -148,7 +160,7 @@ public class KeyStoreWrapper implements SecureSettings {
     private final byte[] dataBytes;
 
     /** The decrypted secret data. See {@link #decrypt(char[])}. */
-    private final SetOnce<Map<String, byte[]>> entries = new SetOnce<>();
+    private final SetOnce<Map<String, Entry>> entries = new SetOnce<>();
     private volatile boolean closed;
 
     private KeyStoreWrapper(int formatVersion, boolean hasPassword, byte[] dataBytes) {
@@ -350,7 +362,7 @@ public class KeyStoreWrapper implements SecureSettings {
                 int entrySize = input.readInt();
                 byte[] entryBytes = new byte[entrySize];
                 input.readFully(entryBytes);
-                entries.get().put(setting, entryBytes);
+                entries.get().put(setting, new Entry(entryBytes));
             }
             if (input.read() != -1) {
                 throw new SecurityException("Keystore has been corrupted or tampered with");
@@ -369,11 +381,11 @@ public class KeyStoreWrapper implements SecureSettings {
         try (CipherOutputStream cipherStream = new CipherOutputStream(bytes, cipher);
              DataOutputStream output = new DataOutputStream(cipherStream)) {
             output.writeInt(entries.get().size());
-            for (Map.Entry<String, byte[]> mapEntry : entries.get().entrySet()) {
+            for (Map.Entry<String, Entry> mapEntry : entries.get().entrySet()) {
                 output.writeUTF(mapEntry.getKey());
-                byte[] entry = mapEntry.getValue();
-                output.writeInt(entry.length);
-                output.write(entry);
+                byte[] entryBytes = mapEntry.getValue().bytes;
+                output.writeInt(entryBytes.length);
+                output.write(entryBytes);
             }
         }
         return bytes.toByteArray();
@@ -448,7 +460,7 @@ public class KeyStoreWrapper implements SecureSettings {
             }
             Arrays.fill(chars, '\0');
 
-            entries.get().put(setting, bytes);
+            entries.get().put(setting, new Entry(bytes));
         }
     }
 
@@ -521,8 +533,8 @@ public class KeyStoreWrapper implements SecureSettings {
     @Override
     public synchronized SecureString getString(String setting) {
         ensureOpen();
-        byte[] entry = entries.get().get(setting);
-        ByteBuffer byteBuffer = ByteBuffer.wrap(entry);
+        Entry entry = entries.get().get(setting);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(entry.bytes);
         CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);
         return new SecureString(Arrays.copyOfRange(charBuffer.array(), charBuffer.position(), charBuffer.limit()));
     }
@@ -530,8 +542,19 @@ public class KeyStoreWrapper implements SecureSettings {
     @Override
     public synchronized InputStream getFile(String setting) {
         ensureOpen();
-        byte[] entry = entries.get().get(setting);
-        return new ByteArrayInputStream(entry);
+        Entry entry = entries.get().get(setting);
+        return new ByteArrayInputStream(entry.bytes);
+    }
+
+    /**
+     * Returns the SHA256 digest for the setting's value, even after {@code #close()} has been called. The setting must exist. The digest is
+     * used to check for value changes without actually storing the value.
+     */
+    @Override
+    public byte[] getSHA256Digest(String setting) {
+        assert entries.get() != null : "Keystore is not loaded";
+        Entry entry = entries.get().get(setting);
+        return entry.sha256Digest;
     }
 
     /**
@@ -553,9 +576,9 @@ public class KeyStoreWrapper implements SecureSettings {
 
         ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(CharBuffer.wrap(value));
         byte[] bytes = Arrays.copyOfRange(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());
-        byte[] oldEntry = entries.get().put(setting, bytes);
+        Entry oldEntry = entries.get().put(setting, new Entry(bytes));
         if (oldEntry != null) {
-            Arrays.fill(oldEntry, (byte)0);
+            Arrays.fill(oldEntry.bytes, (byte)0);
         }
     }
 
@@ -564,18 +587,18 @@ public class KeyStoreWrapper implements SecureSettings {
         ensureOpen();
         validateSettingName(setting);
 
-        byte[] oldEntry = entries.get().put(setting, Arrays.copyOf(bytes, bytes.length));
+        Entry oldEntry = entries.get().put(setting, new Entry(Arrays.copyOf(bytes, bytes.length)));
         if (oldEntry != null) {
-            Arrays.fill(oldEntry, (byte)0);
+            Arrays.fill(oldEntry.bytes, (byte)0);
         }
     }
 
     /** Remove the given setting from the keystore. */
     void remove(String setting) {
         ensureOpen();
-        byte[] oldEntry = entries.get().remove(setting);
+        Entry oldEntry = entries.get().remove(setting);
         if (oldEntry != null) {
-            Arrays.fill(oldEntry, (byte)0);
+            Arrays.fill(oldEntry.bytes, (byte)0);
         }
     }
 
@@ -590,8 +613,8 @@ public class KeyStoreWrapper implements SecureSettings {
     public synchronized void close() {
         this.closed = true;
         if (null != entries.get() && entries.get().isEmpty() == false) {
-            for (byte[] entry : entries.get().values()) {
-                Arrays.fill(entry, (byte) 0);
+            for (Entry entry : entries.get().values()) {
+                Arrays.fill(entry.bytes, (byte) 0);
             }
         }
     }

+ 18 - 1
server/src/main/java/org/elasticsearch/common/settings/SecureSetting.java

@@ -37,7 +37,7 @@ public abstract class SecureSetting<T> extends Setting<T> {
     /** Determines whether legacy settings with sensitive values should be allowed. */
     private static final boolean ALLOW_INSECURE_SETTINGS = Booleans.parseBoolean(System.getProperty("es.allow_insecure_settings", "false"));
 
-    private static final Set<Property> ALLOWED_PROPERTIES = EnumSet.of(Property.Deprecated);
+    private static final Set<Property> ALLOWED_PROPERTIES = EnumSet.of(Property.Deprecated, Property.Consistent);
 
     private static final Property[] FIXED_PROPERTIES = {
         Property.NodeScope
@@ -97,6 +97,23 @@ public abstract class SecureSetting<T> extends Setting<T> {
         }
     }
 
+    /**
+     * Returns the digest of this secure setting's value or {@code null} if the setting is missing (inside the keystore). This method can be
+     * called even after the {@code SecureSettings} have been closed, unlike {@code #get(Settings)}. The digest is used to check for changes
+     * of the value (by re-reading the {@code SecureSettings}), without actually transmitting the value to compare with.
+     */
+    public byte[] getSecretDigest(Settings settings) {
+        final SecureSettings secureSettings = settings.getSecureSettings();
+        if (secureSettings == null || false == secureSettings.getSettingNames().contains(getKey())) {
+            return null;
+        }
+        try {
+            return secureSettings.getSHA256Digest(getKey());
+        } catch (GeneralSecurityException e) {
+            throw new RuntimeException("failed to read secure setting " + getKey(), e);
+        }
+    }
+
     /** Returns the secret setting from the keyStoreReader store. */
     abstract T getSecret(SecureSettings secureSettings) throws GeneralSecurityException;
 

+ 2 - 0
server/src/main/java/org/elasticsearch/common/settings/SecureSettings.java

@@ -42,6 +42,8 @@ public interface SecureSettings extends Closeable {
     /** Return a file setting. The {@link InputStream} should be closed once it is used. */
     InputStream getFile(String setting) throws GeneralSecurityException;
 
+    byte[] getSHA256Digest(String setting) throws GeneralSecurityException;
+
     @Override
     void close() throws IOException;
 }

+ 20 - 0
server/src/main/java/org/elasticsearch/common/settings/Setting.java

@@ -112,6 +112,11 @@ public class Setting<T> implements ToXContentObject {
          */
         NodeScope,
 
+        /**
+         * Secure setting values equal on all nodes
+         */
+        Consistent,
+
         /**
          * Index scope
          */
@@ -167,6 +172,7 @@ public class Setting<T> implements ToXContentObject {
             checkPropertyRequiresIndexScope(propertiesAsSet, Property.NotCopyableOnResize);
             checkPropertyRequiresIndexScope(propertiesAsSet, Property.InternalIndex);
             checkPropertyRequiresIndexScope(propertiesAsSet, Property.PrivateIndex);
+            checkPropertyRequiresNodeScope(propertiesAsSet, Property.Consistent);
             this.properties = propertiesAsSet;
         }
     }
@@ -177,6 +183,12 @@ public class Setting<T> implements ToXContentObject {
         }
     }
 
+    private void checkPropertyRequiresNodeScope(final EnumSet<Property> properties, final Property property) {
+        if (properties.contains(property) && properties.contains(Property.NodeScope) == false) {
+            throw new IllegalArgumentException("non-node-scoped setting [" + key + "] can not have property [" + property + "]");
+        }
+    }
+
     /**
      * Creates a new Setting instance
      * @param key the settings key for this setting.
@@ -321,6 +333,14 @@ public class Setting<T> implements ToXContentObject {
         return properties.contains(Property.NodeScope);
     }
 
+    /**
+     * Returns <code>true</code> if this setting's value can be checked for equality across all nodes. Only {@link SecureSetting} instances
+     * may have this qualifier.
+     */
+    public boolean isConsistent() {
+        return properties.contains(Property.Consistent);
+    }
+
     /**
      * Returns <code>true</code> if this setting has an index scope, otherwise <code>false</code>
      */

+ 7 - 2
server/src/main/java/org/elasticsearch/common/settings/Settings.java

@@ -1324,15 +1324,20 @@ public final class Settings implements ToXContentFragment {
         }
 
         @Override
-        public SecureString getString(String setting) throws GeneralSecurityException{
+        public SecureString getString(String setting) throws GeneralSecurityException {
             return delegate.getString(addPrefix.apply(setting));
         }
 
         @Override
-        public InputStream getFile(String setting) throws GeneralSecurityException{
+        public InputStream getFile(String setting) throws GeneralSecurityException {
             return delegate.getFile(addPrefix.apply(setting));
         }
 
+        @Override
+        public byte[] getSHA256Digest(String setting) throws GeneralSecurityException {
+            return delegate.getSHA256Digest(addPrefix.apply(setting));
+        }
+
         @Override
         public void close() throws IOException {
             delegate.close();

+ 21 - 1
server/src/main/java/org/elasticsearch/common/settings/SettingsModule.java

@@ -49,6 +49,7 @@ public class SettingsModule implements Module {
     private final Set<String> settingsFilterPattern = new HashSet<>();
     private final Map<String, Setting<?>> nodeSettings = new HashMap<>();
     private final Map<String, Setting<?>> indexSettings = new HashMap<>();
+    private final Set<Setting<?>> consistentSettings = new HashSet<>();
     private final IndexScopedSettings indexScopedSettings;
     private final ClusterSettings clusterSettings;
     private final SettingsFilter settingsFilter;
@@ -157,7 +158,6 @@ public class SettingsModule implements Module {
         binder.bind(IndexScopedSettings.class).toInstance(indexScopedSettings);
     }
 
-
     /**
      * Registers a new setting. This method should be used by plugins in order to expose any custom settings the plugin defines.
      * Unless a setting is registered the setting is unusable. If a setting is never the less specified the node will reject
@@ -175,6 +175,19 @@ public class SettingsModule implements Module {
                 if (existingSetting != null) {
                     throw new IllegalArgumentException("Cannot register setting [" + setting.getKey() + "] twice");
                 }
+                if (setting.isConsistent()) {
+                    if (setting instanceof Setting.AffixSetting<?>) {
+                        if (((Setting.AffixSetting<?>)setting).getConcreteSettingForNamespace("_na_") instanceof SecureSetting<?>) {
+                            consistentSettings.add(setting);
+                        } else {
+                            throw new IllegalArgumentException("Invalid consistent secure setting [" + setting.getKey() + "]");
+                        }
+                    } else if (setting instanceof SecureSetting<?>) {
+                        consistentSettings.add(setting);
+                    } else {
+                        throw new IllegalArgumentException("Invalid consistent secure setting [" + setting.getKey() + "]");
+                    }
+                }
                 nodeSettings.put(setting.getKey(), setting);
             }
             if (setting.hasIndexScope()) {
@@ -182,6 +195,9 @@ public class SettingsModule implements Module {
                 if (existingSetting != null) {
                     throw new IllegalArgumentException("Cannot register setting [" + setting.getKey() + "] twice");
                 }
+                if (setting.isConsistent()) {
+                    throw new IllegalStateException("Consistent setting [" + setting.getKey() + "] cannot be index scoped");
+                }
                 indexSettings.put(setting.getKey(), setting);
             }
         } else {
@@ -215,6 +231,10 @@ public class SettingsModule implements Module {
         return clusterSettings;
     }
 
+    public Set<Setting<?>> getConsistentSettings() {
+        return consistentSettings;
+    }
+
     public SettingsFilter getSettingsFilter() {
         return settingsFilter;
     }

+ 4 - 0
server/src/main/java/org/elasticsearch/node/Node.java

@@ -74,6 +74,7 @@ import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.ConsistentSettingsService;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.SettingUpgrader;
@@ -363,6 +364,9 @@ public class Node implements Closeable {
             final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
             clusterService.addStateApplier(scriptModule.getScriptService());
             resourcesToClose.add(clusterService);
+            clusterService.addLocalNodeMasterListener(
+                    new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings())
+                            .newHashPublisher());
             final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
                 scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
             final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,

+ 188 - 0
server/src/test/java/org/elasticsearch/common/settings/ConsistentSettingsIT.java

@@ -0,0 +1,188 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.settings;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Setting.AffixSetting;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
+public class ConsistentSettingsIT extends ESIntegTestCase {
+
+    static final Setting<SecureString> DUMMY_STRING_CONSISTENT_SETTING = SecureSetting
+            .secureString("dummy.consistent.secure.string.setting", null, Setting.Property.Consistent);
+    static final AffixSetting<SecureString> DUMMY_AFFIX_STRING_CONSISTENT_SETTING = Setting.affixKeySetting(
+            "dummy.consistent.secure.string.affix.setting.", "suffix",
+            key -> SecureSetting.secureString(key, null, Setting.Property.Consistent));
+    private final AtomicReference<Function<Integer, Settings>> nodeSettingsOverride = new AtomicReference<>(null);
+
+    public void testAllConsistentOnAllNodesSuccess() throws Exception {
+        for (String nodeName : internalCluster().getNodeNames()) {
+            Environment environment = internalCluster().getInstance(Environment.class, nodeName);
+            ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
+            assertTrue("Empty settings list always consistent.",
+                    new ConsistentSettingsService(environment.settings(), clusterService, Collections.emptyList()).areAllConsistent());
+            assertTrue(
+                    "Simple consistent secure setting is consistent [" + clusterService.state().metaData().hashesOfConsistentSettings()
+                            + "].",
+                    new ConsistentSettingsService(environment.settings(), clusterService,
+                            Collections.singletonList(DUMMY_STRING_CONSISTENT_SETTING)).areAllConsistent());
+            assertTrue(
+                    "Affix consistent secure setting is consistent [" + clusterService.state().metaData().hashesOfConsistentSettings()
+                            + "].",
+                    new ConsistentSettingsService(environment.settings(), clusterService,
+                            Collections.singletonList(DUMMY_AFFIX_STRING_CONSISTENT_SETTING)).areAllConsistent());
+            assertTrue("All secure settings are consistent [" + clusterService.state().metaData().hashesOfConsistentSettings() + "].",
+                    new ConsistentSettingsService(environment.settings(), clusterService,
+                            List.of(DUMMY_STRING_CONSISTENT_SETTING, DUMMY_AFFIX_STRING_CONSISTENT_SETTING)).areAllConsistent());
+        }
+    }
+
+    public void testConsistencyFailures() throws Exception {
+        nodeSettingsOverride.set(nodeOrdinal -> {
+            Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
+            MockSecureSettings secureSettings = new MockSecureSettings();
+            if (randomBoolean()) {
+                // different value
+                secureSettings.setString("dummy.consistent.secure.string.setting", "DIFFERENT_VALUE");
+            } else {
+                // missing value
+                // secureSettings.setString("dummy.consistent.secure.string.setting", "string_value");
+            }
+            secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix1" + ".suffix", "affix_value_1");
+            secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix2" + ".suffix", "affix_value_2");
+            assert builder.getSecureSettings() == null : "Deal with the settings merge";
+            builder.setSecureSettings(secureSettings);
+            return builder.build();
+        });
+        String newNodeName = internalCluster().startNode();
+        Environment environment = internalCluster().getInstance(Environment.class, newNodeName);
+        ClusterService clusterService = internalCluster().getInstance(ClusterService.class, newNodeName);
+        assertTrue("Empty settings list always consistent.",
+                new ConsistentSettingsService(environment.settings(), clusterService, Collections.emptyList()).areAllConsistent());
+        assertFalse(
+                "Simple consistent secure setting is NOT consistent [" + clusterService.state().metaData().hashesOfConsistentSettings()
+                        + "].",
+                new ConsistentSettingsService(environment.settings(), clusterService,
+                        Collections.singletonList(DUMMY_STRING_CONSISTENT_SETTING)).areAllConsistent());
+        assertTrue(
+                "Affix consistent secure setting is consistent [" + clusterService.state().metaData().hashesOfConsistentSettings()
+                        + "].",
+                new ConsistentSettingsService(environment.settings(), clusterService,
+                        Collections.singletonList(DUMMY_AFFIX_STRING_CONSISTENT_SETTING)).areAllConsistent());
+        assertFalse("All secure settings are NOT consistent [" + clusterService.state().metaData().hashesOfConsistentSettings() + "].",
+                new ConsistentSettingsService(environment.settings(), clusterService,
+                        List.of(DUMMY_STRING_CONSISTENT_SETTING, DUMMY_AFFIX_STRING_CONSISTENT_SETTING)).areAllConsistent());
+        nodeSettingsOverride.set(nodeOrdinal -> {
+            Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
+            MockSecureSettings secureSettings = new MockSecureSettings();
+            secureSettings.setString("dummy.consistent.secure.string.setting", "string_value");
+            if (randomBoolean()) {
+                secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix1" + ".suffix", "affix_value_1");
+                if (randomBoolean()) {
+                    secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix2" + ".suffix", "DIFFERENT_VALUE");
+                } else {
+                    // missing value
+                    // "dummy.consistent.secure.string.affix.setting.affix2.suffix"
+                }
+            } else {
+                if (randomBoolean()) {
+                    secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix1" + ".suffix", "DIFFERENT_VALUE_1");
+                    secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix2" + ".suffix", "DIFFERENT_VALUE_2");
+                } else {
+                    // missing values
+                    // dummy.consistent.secure.string.affix.setting.affix1.suffix
+                    // dummy.consistent.secure.string.affix.setting.affix2.suffix
+                }
+            }
+            assert builder.getSecureSettings() == null : "Deal with the settings merge";
+            builder.setSecureSettings(secureSettings);
+            return builder.build();
+        });
+        newNodeName = internalCluster().startNode();
+        environment = internalCluster().getInstance(Environment.class, newNodeName);
+        clusterService = internalCluster().getInstance(ClusterService.class, newNodeName);
+        assertTrue("Empty settings list always consistent.",
+                new ConsistentSettingsService(environment.settings(), clusterService, Collections.emptyList()).areAllConsistent());
+        assertTrue(
+                "Simple consistent secure setting is consistent [" + clusterService.state().metaData().hashesOfConsistentSettings()
+                        + "].",
+                new ConsistentSettingsService(environment.settings(), clusterService,
+                        Collections.singletonList(DUMMY_STRING_CONSISTENT_SETTING)).areAllConsistent());
+        assertFalse(
+                "Affix consistent secure setting is NOT consistent [" + clusterService.state().metaData().hashesOfConsistentSettings()
+                        + "].",
+                new ConsistentSettingsService(environment.settings(), clusterService,
+                        Collections.singletonList(DUMMY_AFFIX_STRING_CONSISTENT_SETTING)).areAllConsistent());
+        assertFalse("All secure settings are NOT consistent [" + clusterService.state().metaData().hashesOfConsistentSettings() + "].",
+                new ConsistentSettingsService(environment.settings(), clusterService,
+                        List.of(DUMMY_STRING_CONSISTENT_SETTING, DUMMY_AFFIX_STRING_CONSISTENT_SETTING)).areAllConsistent());
+        nodeSettingsOverride.set(null);
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        Function<Integer, Settings> nodeSettingsOverrideFunction = nodeSettingsOverride.get();
+        if (nodeSettingsOverrideFunction != null) {
+            final Settings overrideSettings = nodeSettingsOverrideFunction.apply(nodeOrdinal);
+            if (overrideSettings != null) {
+                return overrideSettings;
+            }
+        }
+        Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
+        MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString("dummy.consistent.secure.string.setting", "string_value");
+        secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix1" + ".suffix", "affix_value_1");
+        secureSettings.setString("dummy.consistent.secure.string.affix.setting." + "affix2" + ".suffix", "affix_value_2");
+        assert builder.getSecureSettings() == null : "Deal with the settings merge";
+        builder.setSecureSettings(secureSettings);
+        return builder.build();
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        Collection<Class<? extends Plugin>> classes = new ArrayList<>(super.nodePlugins());
+        classes.add(DummyPlugin.class);
+        return classes;
+    }
+
+    public static final class DummyPlugin extends Plugin {
+
+        public DummyPlugin() {
+        }
+
+        @Override
+        public List<Setting<?>> getSettings() {
+            List<Setting<?>> settings = new ArrayList<>(super.getSettings());
+            settings.add(DUMMY_STRING_CONSISTENT_SETTING);
+            settings.add(DUMMY_AFFIX_STRING_CONSISTENT_SETTING);
+            return settings;
+        }
+    }
+}

+ 159 - 0
server/src/test/java/org/elasticsearch/common/settings/ConsistentSettingsServiceTests.java

@@ -0,0 +1,159 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.settings;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.mock.orig.Mockito;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+import org.mockito.stubbing.Answer;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Mockito.mock;
+import static org.hamcrest.Matchers.is;
+
+public class ConsistentSettingsServiceTests extends ESTestCase {
+
+    private AtomicReference<ClusterState> clusterState = new AtomicReference<>();
+    private ClusterService clusterService;
+
+    @Before
+    public void init() throws Exception {
+        clusterState.set(ClusterState.EMPTY_STATE);
+        clusterService = mock(ClusterService.class);
+        Mockito.doAnswer((Answer) invocation -> {
+            return clusterState.get();
+        }).when(clusterService).state();
+        Mockito.doAnswer((Answer) invocation -> {
+            final ClusterStateUpdateTask arg0 = (ClusterStateUpdateTask) invocation.getArguments()[1];
+            this.clusterState.set(arg0.execute(this.clusterState.get()));
+            return null;
+        }).when(clusterService).submitStateUpdateTask(Mockito.isA(String.class), Mockito.isA(ClusterStateUpdateTask.class));
+    }
+
+    public void testSingleStringSetting() throws Exception {
+        Setting<?> stringSetting = SecureSetting.secureString("test.simple.foo", null, Setting.Property.Consistent);
+        MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString(stringSetting.getKey(), "somethingsecure");
+        secureSettings.setString("test.noise.setting", "noise");
+        Settings.Builder builder = Settings.builder();
+        builder.setSecureSettings(secureSettings);
+        Settings settings = builder.build();
+        // hashes not yet published
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).areAllConsistent(), is(false));
+        // publish
+        new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).newHashPublisher().onMaster();
+        ConsistentSettingsService consistentService = new ConsistentSettingsService(settings, clusterService, List.of(stringSetting));
+        assertThat(consistentService.areAllConsistent(), is(true));
+        // change value
+        secureSettings.setString(stringSetting.getKey(), "_TYPO_somethingsecure");
+        assertThat(consistentService.areAllConsistent(), is(false));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).areAllConsistent(), is(false));
+        // publish change
+        new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).newHashPublisher().onMaster();
+        assertThat(consistentService.areAllConsistent(), is(true));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).areAllConsistent(), is(true));
+    }
+
+    public void testSingleAffixSetting() throws Exception {
+        Setting.AffixSetting<?> affixStringSetting = Setting.affixKeySetting("test.affix.", "bar",
+                (key) -> SecureSetting.secureString(key, null, Setting.Property.Consistent));
+        // add two affix settings to the keystore
+        MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString("test.noise.setting", "noise");
+        secureSettings.setString("test.affix.first.bar", "first_secure");
+        secureSettings.setString("test.affix.second.bar", "second_secure");
+        Settings.Builder builder = Settings.builder();
+        builder.setSecureSettings(secureSettings);
+        Settings settings = builder.build();
+        // hashes not yet published
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(false));
+        // publish
+        new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).newHashPublisher().onMaster();
+        ConsistentSettingsService consistentService = new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting));
+        assertThat(consistentService.areAllConsistent(), is(true));
+        // change value
+        secureSettings.setString("test.affix.second.bar", "_TYPO_second_secure");
+        assertThat(consistentService.areAllConsistent(), is(false));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(false));
+        // publish change
+        new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).newHashPublisher().onMaster();
+        assertThat(consistentService.areAllConsistent(), is(true));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(true));
+        // add value
+        secureSettings.setString("test.affix.third.bar", "third_secure");
+        builder = Settings.builder();
+        builder.setSecureSettings(secureSettings);
+        settings = builder.build();
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(false));
+        // publish
+        new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).newHashPublisher().onMaster();
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(true));
+        // remove value
+        secureSettings = new MockSecureSettings();
+        secureSettings.setString("test.another.noise.setting", "noise");
+        // missing value test.affix.first.bar
+        secureSettings.setString("test.affix.second.bar", "second_secure");
+        secureSettings.setString("test.affix.third.bar", "third_secure");
+        builder = Settings.builder();
+        builder.setSecureSettings(secureSettings);
+        settings = builder.build();
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(false));
+    }
+
+    public void testStringAndAffixSettings() throws Exception {
+        Setting<?> stringSetting = SecureSetting.secureString("mock.simple.foo", null, Setting.Property.Consistent);
+        Setting.AffixSetting<?> affixStringSetting = Setting.affixKeySetting("mock.affix.", "bar",
+                (key) -> SecureSetting.secureString(key, null, Setting.Property.Consistent));
+        MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString(randomAlphaOfLength(8).toLowerCase(Locale.ROOT), "noise");
+        secureSettings.setString(stringSetting.getKey(), "somethingsecure");
+        secureSettings.setString("mock.affix.foo.bar", "another_secure");
+        Settings.Builder builder = Settings.builder();
+        builder.setSecureSettings(secureSettings);
+        Settings settings = builder.build();
+        // hashes not yet published
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting, affixStringSetting)).areAllConsistent(),
+                is(false));
+        // publish only the simple string setting
+        new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).newHashPublisher().onMaster();
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).areAllConsistent(), is(true));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(false));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting, affixStringSetting)).areAllConsistent(),
+                is(false));
+        // publish only the affix string setting
+        new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).newHashPublisher().onMaster();
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).areAllConsistent(), is(false));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(true));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting, affixStringSetting)).areAllConsistent(),
+                is(false));
+        // publish both settings
+        new ConsistentSettingsService(settings, clusterService, List.of(stringSetting, affixStringSetting)).newHashPublisher().onMaster();
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting)).areAllConsistent(), is(true));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(affixStringSetting)).areAllConsistent(), is(true));
+        assertThat(new ConsistentSettingsService(settings, clusterService, List.of(stringSetting, affixStringSetting)).areAllConsistent(),
+                is(true));
+    }
+}

+ 36 - 0
server/src/test/java/org/elasticsearch/common/settings/SettingsModuleTests.java

@@ -21,11 +21,13 @@ package org.elasticsearch.common.settings;
 
 import org.elasticsearch.common.inject.ModuleTestCase;
 import org.elasticsearch.common.settings.Setting.Property;
+import org.hamcrest.Matchers;
 
 import java.util.Arrays;
 
 import static java.util.Collections.emptySet;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
 
 public class SettingsModuleTests extends ModuleTestCase {
 
@@ -85,6 +87,40 @@ public class SettingsModuleTests extends ModuleTestCase {
         }
     }
 
+    public void testRegisterConsistentSettings() {
+        MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString("some.custom.secure.consistent.setting", "secure_value");
+        final Settings settings = Settings.builder().setSecureSettings(secureSettings).build();
+        final Setting<?> concreteConsistentSetting = SecureSetting.secureString("some.custom.secure.consistent.setting", null,
+                Setting.Property.Consistent);
+        SettingsModule module = new SettingsModule(settings, concreteConsistentSetting);
+        assertInstanceBinding(module, Settings.class, (s) -> s == settings);
+        assertThat(module.getConsistentSettings(), Matchers.containsInAnyOrder(concreteConsistentSetting));
+
+        final Setting<?> concreteUnsecureConsistentSetting = Setting.simpleString("some.custom.UNSECURE.consistent.setting",
+                Property.Consistent, Property.NodeScope);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+                () -> new SettingsModule(Settings.builder().build(), concreteUnsecureConsistentSetting));
+        assertThat(e.getMessage(), is("Invalid consistent secure setting [some.custom.UNSECURE.consistent.setting]"));
+
+        secureSettings = new MockSecureSettings();
+        secureSettings.setString("some.custom.secure.consistent.afix.wow.setting", "secure_value");
+        final Settings settings2 = Settings.builder().setSecureSettings(secureSettings).build();
+        final Setting<?> afixConcreteConsistentSetting = Setting.affixKeySetting(
+                "some.custom.secure.consistent.afix.", "setting",
+                key -> SecureSetting.secureString(key, null, Setting.Property.Consistent));
+        module = new SettingsModule(settings2,afixConcreteConsistentSetting);
+        assertInstanceBinding(module, Settings.class, (s) -> s == settings2);
+        assertThat(module.getConsistentSettings(), Matchers.containsInAnyOrder(afixConcreteConsistentSetting));
+
+        final Setting<?> concreteUnsecureConsistentAfixSetting = Setting.affixKeySetting(
+                "some.custom.secure.consistent.afix.", "setting",
+                key -> Setting.simpleString(key, Setting.Property.Consistent, Property.NodeScope));
+        e = expectThrows(IllegalArgumentException.class,
+                () -> new SettingsModule(Settings.builder().build(), concreteUnsecureConsistentAfixSetting));
+        assertThat(e.getMessage(), is("Invalid consistent secure setting [some.custom.secure.consistent.afix.*.setting]"));
+    }
+
     public void testLoggerSettings() {
         {
             Settings settings = Settings.builder().put("logger._root", "TRACE").put("logger.transport", "INFO").build();

+ 13 - 0
test/framework/src/main/java/org/elasticsearch/common/settings/MockSecureSettings.java

@@ -19,9 +19,12 @@
 
 package org.elasticsearch.common.settings;
 
+import org.elasticsearch.common.hash.MessageDigests;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -35,6 +38,7 @@ public class MockSecureSettings implements SecureSettings {
 
     private Map<String, SecureString> secureStrings = new HashMap<>();
     private Map<String, byte[]> files = new HashMap<>();
+    private Map<String, byte[]> sha256Digests = new HashMap<>();
     private Set<String> settingNames = new HashSet<>();
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -44,6 +48,7 @@ public class MockSecureSettings implements SecureSettings {
     private MockSecureSettings(MockSecureSettings source) {
         secureStrings.putAll(source.secureStrings);
         files.putAll(source.files);
+        sha256Digests.putAll(source.sha256Digests);
         settingNames.addAll(source.settingNames);
     }
 
@@ -69,15 +74,22 @@ public class MockSecureSettings implements SecureSettings {
         return new ByteArrayInputStream(files.get(setting));
     }
 
+    @Override
+    public byte[] getSHA256Digest(String setting) {
+        return sha256Digests.get(setting);
+    }
+
     public void setString(String setting, String value) {
         ensureOpen();
         secureStrings.put(setting, new SecureString(value.toCharArray()));
+        sha256Digests.put(setting, MessageDigests.sha256().digest(value.getBytes(StandardCharsets.UTF_8)));
         settingNames.add(setting);
     }
 
     public void setFile(String setting, byte[] value) {
         ensureOpen();
         files.put(setting, value);
+        sha256Digests.put(setting, MessageDigests.sha256().digest(value));
         settingNames.add(setting);
     }
 
@@ -90,6 +102,7 @@ public class MockSecureSettings implements SecureSettings {
         }
         settingNames.addAll(secureSettings.settingNames);
         secureStrings.putAll(secureSettings.secureStrings);
+        sha256Digests.putAll(secureSettings.sha256Digests);
         files.putAll(secureSettings.files);
     }
 

+ 12 - 5
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/NotificationService.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.notification;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.SecureSettings;
 import org.elasticsearch.common.settings.SecureString;
@@ -179,12 +180,13 @@ public abstract class NotificationService<Account> {
         // get the secure settings out
         final SecureSettings sourceSecureSettings = Settings.builder().put(source, true).getSecureSettings();
         // filter and cache them...
-        final Map<String, SecureString> cache = new HashMap<>();
+        final Map<String, Tuple<SecureString, byte[]>> cache = new HashMap<>();
         if (sourceSecureSettings != null && securePluginSettings != null) {
             for (final String settingKey : sourceSecureSettings.getSettingNames()) {
                 for (final Setting<?> secureSetting : securePluginSettings) {
                     if (secureSetting.match(settingKey)) {
-                        cache.put(settingKey, sourceSecureSettings.getString(settingKey));
+                        cache.put(settingKey,
+                                new Tuple<>(sourceSecureSettings.getString(settingKey), sourceSecureSettings.getSHA256Digest(settingKey)));
                     }
                 }
             }
@@ -197,8 +199,8 @@ public abstract class NotificationService<Account> {
             }
 
             @Override
-            public SecureString getString(String setting) throws GeneralSecurityException {
-                return cache.get(setting);
+            public SecureString getString(String setting) {
+                return cache.get(setting).v1();
             }
 
             @Override
@@ -207,10 +209,15 @@ public abstract class NotificationService<Account> {
             }
 
             @Override
-            public InputStream getFile(String setting) throws GeneralSecurityException {
+            public InputStream getFile(String setting) {
                 throw new IllegalStateException("A NotificationService setting cannot be File.");
             }
 
+            @Override
+            public byte[] getSHA256Digest(String setting) {
+                return cache.get(setting).v2();
+            }
+
             @Override
             public void close() throws IOException {
             }

+ 7 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/notification/NotificationServiceTests.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.watcher.notification;
 
+import org.elasticsearch.common.hash.MessageDigests;
 import org.elasticsearch.common.settings.SecureSetting;
 import org.elasticsearch.common.settings.SecureSettings;
 import org.elasticsearch.common.settings.SecureString;
@@ -16,6 +17,7 @@ import org.elasticsearch.xpack.watcher.notification.NotificationService;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -247,6 +249,11 @@ public class NotificationServiceTests extends ESTestCase {
                 return null;
             }
 
+            @Override
+            public byte[] getSHA256Digest(String setting) throws GeneralSecurityException {
+                return MessageDigests.sha256().digest(new String(secureSettingsMap.get(setting)).getBytes(StandardCharsets.UTF_8));
+            }
+
             @Override
             public void close() throws IOException {
             }