|
@@ -19,29 +19,31 @@
|
|
|
package org.elasticsearch.repositories.hdfs;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.lang.reflect.Constructor;
|
|
|
+import java.io.UncheckedIOException;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.net.URI;
|
|
|
+import java.net.UnknownHostException;
|
|
|
import java.security.AccessController;
|
|
|
-import java.security.Principal;
|
|
|
import java.security.PrivilegedAction;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
|
-import javax.security.auth.Subject;
|
|
|
-
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
|
-import org.elasticsearch.ElasticsearchGenerationException;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.SpecialPermission;
|
|
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.SuppressForbidden;
|
|
|
import org.elasticsearch.common.blobstore.BlobPath;
|
|
|
import org.elasticsearch.common.blobstore.BlobStore;
|
|
|
+import org.elasticsearch.common.logging.Loggers;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
@@ -51,9 +53,14 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
|
|
|
|
|
public final class HdfsRepository extends BlobStoreRepository {
|
|
|
|
|
|
- private final BlobPath basePath = BlobPath.cleanPath();
|
|
|
+ private static final Logger LOGGER = Loggers.getLogger(HdfsRepository.class);
|
|
|
+
|
|
|
+ private static final String CONF_SECURITY_PRINCIPAL = "security.principal";
|
|
|
+
|
|
|
+ private final Environment environment;
|
|
|
private final ByteSizeValue chunkSize;
|
|
|
private final boolean compress;
|
|
|
+ private final BlobPath basePath = BlobPath.cleanPath();
|
|
|
|
|
|
private HdfsBlobStore blobStore;
|
|
|
|
|
@@ -65,6 +72,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|
|
NamedXContentRegistry namedXContentRegistry) throws IOException {
|
|
|
super(metadata, environment.settings(), namedXContentRegistry);
|
|
|
|
|
|
+ this.environment = environment;
|
|
|
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
|
|
|
this.compress = metadata.settings().getAsBoolean("compress", false);
|
|
|
}
|
|
@@ -101,49 +109,116 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|
|
blobStore = new HdfsBlobStore(fileContext, pathSetting, bufferSize);
|
|
|
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), pathSetting);
|
|
|
} catch (IOException e) {
|
|
|
- throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e);
|
|
|
+ throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e);
|
|
|
}
|
|
|
super.doStart();
|
|
|
}
|
|
|
|
|
|
// create hadoop filecontext
|
|
|
- @SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
|
|
|
- private static FileContext createContext(URI uri, Settings repositorySettings) {
|
|
|
- Configuration cfg = new Configuration(repositorySettings.getAsBoolean("load_defaults", true));
|
|
|
- cfg.setClassLoader(HdfsRepository.class.getClassLoader());
|
|
|
- cfg.reloadConfiguration();
|
|
|
+ private FileContext createContext(URI uri, Settings repositorySettings) {
|
|
|
+ Configuration hadoopConfiguration = new Configuration(repositorySettings.getAsBoolean("load_defaults", true));
|
|
|
+ hadoopConfiguration.setClassLoader(HdfsRepository.class.getClassLoader());
|
|
|
+ hadoopConfiguration.reloadConfiguration();
|
|
|
|
|
|
Map<String, String> map = repositorySettings.getByPrefix("conf.").getAsMap();
|
|
|
for (Entry<String, String> entry : map.entrySet()) {
|
|
|
- cfg.set(entry.getKey(), entry.getValue());
|
|
|
+ hadoopConfiguration.set(entry.getKey(), entry.getValue());
|
|
|
}
|
|
|
|
|
|
- // create a hadoop user. if we want some auth, it must be done different anyway, and tested.
|
|
|
- Subject subject;
|
|
|
- try {
|
|
|
- Class<?> clazz = Class.forName("org.apache.hadoop.security.User");
|
|
|
- Constructor<?> ctor = clazz.getConstructor(String.class);
|
|
|
- ctor.setAccessible(true);
|
|
|
- Principal principal = (Principal) ctor.newInstance(System.getProperty("user.name"));
|
|
|
- subject = new Subject(false, Collections.singleton(principal), Collections.emptySet(), Collections.emptySet());
|
|
|
- } catch (ReflectiveOperationException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
+ // Create a hadoop user
|
|
|
+ UserGroupInformation ugi = login(hadoopConfiguration, repositorySettings);
|
|
|
|
|
|
- // disable FS cache
|
|
|
- cfg.setBoolean("fs.hdfs.impl.disable.cache", true);
|
|
|
+ // Disable FS cache
|
|
|
+ hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true);
|
|
|
|
|
|
- // create the filecontext with our user
|
|
|
- return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
|
|
|
+ // Create the filecontext with our user information
|
|
|
+ // This will correctly configure the filecontext to have our UGI as it's internal user.
|
|
|
+ return ugi.doAs((PrivilegedAction<FileContext>) () -> {
|
|
|
try {
|
|
|
- AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
|
|
|
- return FileContext.getFileContext(fs, cfg);
|
|
|
+ AbstractFileSystem fs = AbstractFileSystem.get(uri, hadoopConfiguration);
|
|
|
+ return FileContext.getFileContext(fs, hadoopConfiguration);
|
|
|
} catch (UnsupportedFileSystemException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
+ throw new UncheckedIOException(e);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private UserGroupInformation login(Configuration hadoopConfiguration, Settings repositorySettings) {
|
|
|
+ // Validate the authentication method:
|
|
|
+ AuthenticationMethod authMethod = SecurityUtil.getAuthenticationMethod(hadoopConfiguration);
|
|
|
+ if (authMethod.equals(AuthenticationMethod.SIMPLE) == false
|
|
|
+ && authMethod.equals(AuthenticationMethod.KERBEROS) == false) {
|
|
|
+ throw new RuntimeException("Unsupported authorization mode ["+authMethod+"]");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if the user added a principal to use, and that there is a keytab file provided
|
|
|
+ String kerberosPrincipal = repositorySettings.get(CONF_SECURITY_PRINCIPAL);
|
|
|
+
|
|
|
+ // Check to see if the authentication method is compatible
|
|
|
+ if (kerberosPrincipal != null && authMethod.equals(AuthenticationMethod.SIMPLE)) {
|
|
|
+ LOGGER.warn("Hadoop authentication method is set to [SIMPLE], but a Kerberos principal is " +
|
|
|
+ "specified. Continuing with [KERBEROS] authentication.");
|
|
|
+ SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, hadoopConfiguration);
|
|
|
+ } else if (kerberosPrincipal == null && authMethod.equals(AuthenticationMethod.KERBEROS)) {
|
|
|
+ throw new RuntimeException("HDFS Repository does not support [KERBEROS] authentication without " +
|
|
|
+ "a valid Kerberos principal and keytab. Please specify a principal in the repository settings with [" +
|
|
|
+ CONF_SECURITY_PRINCIPAL + "].");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now we can initialize the UGI with the configuration.
|
|
|
+ UserGroupInformation.setConfiguration(hadoopConfiguration);
|
|
|
+
|
|
|
+ // Debugging
|
|
|
+ LOGGER.debug("Hadoop security enabled: [{}]", UserGroupInformation.isSecurityEnabled());
|
|
|
+ LOGGER.debug("Using Hadoop authentication method: [{}]", SecurityUtil.getAuthenticationMethod(hadoopConfiguration));
|
|
|
+
|
|
|
+ // UserGroupInformation (UGI) instance is just a Hadoop specific wrapper around a Java Subject
|
|
|
+ try {
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ String principal = preparePrincipal(kerberosPrincipal);
|
|
|
+ String keytab = HdfsSecurityContext.locateKeytabFile(environment).toString();
|
|
|
+ LOGGER.debug("Using kerberos principal [{}] and keytab located at [{}]", principal, keytab);
|
|
|
+ return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
|
|
|
+ }
|
|
|
+ return UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new UncheckedIOException("Could not retrieve the current user information", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Convert principals of the format 'service/_HOST@REALM' by subbing in the local address for '_HOST'.
|
|
|
+ private static String preparePrincipal(String originalPrincipal) {
|
|
|
+ String finalPrincipal = originalPrincipal;
|
|
|
+ // Don't worry about host name resolution if they don't have the _HOST pattern in the name.
|
|
|
+ if (originalPrincipal.contains("_HOST")) {
|
|
|
+ try {
|
|
|
+ finalPrincipal = SecurityUtil.getServerPrincipal(originalPrincipal, getHostName());
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new UncheckedIOException(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (originalPrincipal.equals(finalPrincipal) == false) {
|
|
|
+ LOGGER.debug("Found service principal. Converted original principal name [{}] to server principal [{}]",
|
|
|
+ originalPrincipal, finalPrincipal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return finalPrincipal;
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressForbidden(reason = "InetAddress.getLocalHost(); Needed for filling in hostname for a kerberos principal name pattern.")
|
|
|
+ private static String getHostName() {
|
|
|
+ try {
|
|
|
+ /*
|
|
|
+ * This should not block since it should already be resolved via Log4J and Netty. The
|
|
|
+ * host information is cached by the JVM and the TTL for the cache entry is infinite
|
|
|
+ * when the SecurityManager is activated.
|
|
|
+ */
|
|
|
+ return InetAddress.getLocalHost().getCanonicalHostName();
|
|
|
+ } catch (UnknownHostException e) {
|
|
|
+ throw new RuntimeException("Could not locate host information", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected BlobStore blobStore() {
|
|
|
return blobStore;
|