Browse Source

Add HDFS searchable snapshot integration (#66185)

Adds a bounded read implementation on the HDFS blob store as well as integration tests to 
the searchable snapshot project that ensures functionality on both kerberos and simple 
authentication HDFS.
James Baiera 4 years ago
parent
commit
9bb6a3ad2d

+ 19 - 2
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.repositories.hdfs;
 
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -33,6 +34,7 @@ import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
+import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
 
 import java.io.FileNotFoundException;
@@ -112,8 +114,23 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public InputStream readBlob(String blobName, long position, long length) {
-        throw new UnsupportedOperationException();
+    public InputStream readBlob(String blobName, long position, long length) throws IOException {
+        // FSDataInputStream does buffering internally
+        // FSDataInputStream can open connections on read() or skip() so we wrap in
+        // HDFSPrivilegedInputSteam which will ensure that underlying methods will
+        // be called with the proper privileges.
+        try {
+            return store.execute(fileContext -> {
+                FSDataInputStream fsInput = fileContext.open(new Path(path, blobName), bufferSize);
+                // As long as no read operations have happened yet on the stream, seeking
+                // should direct the datanode to start on the appropriate block, at the
+                // appropriate target position.
+                fsInput.seek(position);
+                return Streams.limitStream(new HDFSPrivilegedInputSteam(fsInput, securityContext), length);
+            });
+        } catch (FileNotFoundException fnfe) {
+            throw new NoSuchFileException("[" + blobName + "] blob not found");
+        }
     }
 
     @Override

+ 45 - 0
plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java

@@ -29,10 +29,14 @@ import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.CoreMatchers;
 
 import javax.security.auth.Subject;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -41,6 +45,7 @@ import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Collections;
 
 import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
@@ -133,4 +138,44 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
         assertArrayEquals(readBlobFully(container, "foo", data.length), data);
         assertTrue(container.blobExists("foo"));
     }
+
+    public void testReadRange() throws Exception {
+        FileContext fileContext = createTestContext();
+        // Constructor will not create dir if read only
+        HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, true);
+        FileContext.Util util = fileContext.util();
+        Path root = fileContext.makeQualified(new Path("dir"));
+        assertFalse(util.exists(root));
+        BlobPath blobPath = BlobPath.cleanPath().add("path");
+
+        // blobContainer() will not create path if read only
+        hdfsBlobStore.blobContainer(blobPath);
+        Path hdfsPath = root;
+        for (String p : blobPath) {
+            hdfsPath = new Path(hdfsPath, p);
+        }
+        assertFalse(util.exists(hdfsPath));
+
+        // if not read only, directory will be created
+        hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, false);
+        assertTrue(util.exists(root));
+        BlobContainer container = hdfsBlobStore.blobContainer(blobPath);
+        assertTrue(util.exists(hdfsPath));
+
+        byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
+        writeBlob(container, "foo", new BytesArray(data), randomBoolean());
+        int pos = randomIntBetween(0, data.length / 2);
+        int len = randomIntBetween(pos, data.length) - pos;
+        assertArrayEquals(readBlobPartially(container, "foo", pos, len), Arrays.copyOfRange(data, pos, pos+len));
+        assertTrue(container.blobExists("foo"));
+    }
+
+    public static byte[] readBlobPartially(BlobContainer container, String name, int pos, int length) throws IOException {
+        byte[] data = new byte[length];
+        try (InputStream inputStream = container.readBlob(name, pos, length)) {
+            assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length));
+            assertThat(inputStream.read(), CoreMatchers.equalTo(-1));
+        }
+        return data;
+    }
 }

+ 173 - 0
x-pack/plugin/searchable-snapshots/qa/hdfs/build.gradle

@@ -0,0 +1,173 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+import org.apache.tools.ant.taskdefs.condition.Os
+import org.elasticsearch.gradle.info.BuildParams
+import org.elasticsearch.gradle.test.RestIntegTestTask
+
+import java.nio.file.Files
+import java.nio.file.Paths
+
+import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
+
+apply plugin: 'elasticsearch.test.fixtures'
+apply plugin: 'elasticsearch.standalone-rest-test'
+apply plugin: 'elasticsearch.rest-test'
+apply plugin: 'elasticsearch.rest-resources'
+
+final Project hdfsFixtureProject = project(':test:fixtures:hdfs-fixture')
+final Project krbFixtureProject = project(':test:fixtures:krb5kdc-fixture')
+final Project hdfsRepoPluginProject = project(':plugins:repository-hdfs')
+
+dependencies {
+  testImplementation project(path: xpackModule('searchable-snapshots'), configuration: 'testArtifacts')
+  testImplementation hdfsRepoPluginProject
+}
+
+restResources {
+  restApi {
+    includeCore 'indices', 'search', 'bulk', 'snapshot', 'nodes', '_common'
+    includeXpack 'searchable_snapshots'
+  }
+}
+
+testFixtures.useFixture(krbFixtureProject.path, 'hdfs-snapshot')
+
+configurations {
+  hdfsFixture
+}
+
+dependencies {
+  hdfsFixture hdfsFixtureProject
+  // Set the keytab files in the classpath so that we can access them from test code without the security manager freaking out.
+  if (isEclipse == false) {
+    testRuntimeOnly files(krbFixtureProject.ext.krb5Keytabs("hdfs-snapshot", "hdfs_hdfs.build.elastic.co.keytab").parent)
+  }
+}
+
+normalization {
+  runtimeClasspath {
+    // ignore generated keytab files for the purposes of build avoidance
+    ignore '*.keytab'
+    // ignore fixture ports file which is on the classpath primarily to pacify the security manager
+    ignore 'ports'
+  }
+}
+
+String realm = "BUILD.ELASTIC.CO"
+String krb5conf = krbFixtureProject.ext.krb5Conf("hdfs")
+
+// Create HDFS File System Testing Fixtures
+for (String fixtureName : ['hdfsFixture', 'secureHdfsFixture']) {
+  def tsk = project.tasks.register(fixtureName, org.elasticsearch.gradle.test.AntFixture) {
+    dependsOn project.configurations.hdfsFixture, krbFixtureProject.tasks.postProcessFixture
+    executable = "${BuildParams.runtimeJavaHome}/bin/java"
+    env 'CLASSPATH', "${-> project.configurations.hdfsFixture.asPath}"
+    maxWaitInSeconds 60
+    onlyIf { BuildParams.inFipsJvm == false }
+    waitCondition = { fixture, ant ->
+      // the hdfs.MiniHDFS fixture writes the ports file when
+      // it's ready, so we can just wait for the file to exist
+      return fixture.portsFile.exists()
+    }
+    final List<String> miniHDFSArgs = []
+
+    // If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options
+    if (fixtureName.equals('secureHdfsFixture')) {
+      miniHDFSArgs.add("-Djava.security.krb5.conf=${krb5conf}")
+    }
+
+    // Common options
+    miniHDFSArgs.add('hdfs.MiniHDFS')
+    miniHDFSArgs.add(baseDir)
+
+    // If it's a secure fixture, then set the principal name and keytab locations to use for auth.
+    if (fixtureName.equals('secureHdfsFixture')) {
+      miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}")
+      miniHDFSArgs.add(project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab"))
+    }
+
+    args miniHDFSArgs.toArray()
+  }
+
+  // TODO: The task configuration block has side effects that require it currently to be always executed.
+  // Otherwise tests start failing. Therefore we enforce the task creation for now.
+  tsk.get()
+}
+
+// Disable integration test if Fips mode
+integTest {
+  description = "Runs rest tests against an elasticsearch cluster with HDFS."
+  systemProperty 'test.hdfs.uri', 'hdfs://localhost:9999'
+  nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/simple'
+  onlyIf { BuildParams.inFipsJvm == false }
+}
+
+task integTestSecure(type: RestIntegTestTask) {
+  description = "Runs rest tests against an elasticsearch cluster with Secured HDFS."
+  nonInputProperties.systemProperty 'test.hdfs.uri', 'hdfs://localhost:9998'
+  nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/secure'
+  nonInputProperties.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}"
+  nonInputProperties.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}"
+  nonInputProperties.systemProperty(
+    "test.krb5.keytab.hdfs",
+    project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab")
+  )
+  onlyIf { BuildParams.inFipsJvm == false }
+}
+check.dependsOn(integTestSecure)
+
+testClusters.configureEach {
+  testDistribution = 'DEFAULT'
+  plugin(hdfsRepoPluginProject.path)
+  setting 'xpack.license.self_generated.type', 'trial'
+}
+
+testClusters.integTestSecure {
+  systemProperty "java.security.krb5.conf", krb5conf
+  extraConfigFile(
+    "repository-hdfs/krb5.keytab",
+    file("${project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "elasticsearch.keytab")}"), IGNORE_VALUE
+  )
+}
+
+// Determine HDFS Fixture compatibility for the current build environment.
+boolean fixtureSupported = false
+if (Os.isFamily(Os.FAMILY_WINDOWS)) {
+  // hdfs fixture will not start without hadoop native libraries on windows
+  String nativePath = System.getenv("HADOOP_HOME")
+  if (nativePath != null) {
+    java.nio.file.Path path = Paths.get(nativePath)
+    if (Files.isDirectory(path) &&
+      Files.exists(path.resolve("bin").resolve("winutils.exe")) &&
+      Files.exists(path.resolve("bin").resolve("hadoop.dll")) &&
+      Files.exists(path.resolve("bin").resolve("hdfs.dll"))) {
+      fixtureSupported = true
+    } else {
+      throw new IllegalStateException("HADOOP_HOME: ${path} is invalid, does not contain hadoop native libraries in \$HADOOP_HOME/bin")
+    }
+  }
+} else {
+  fixtureSupported = true
+}
+
+boolean legalPath = rootProject.rootDir.toString().contains(" ") == false
+if (legalPath == false) {
+  fixtureSupported = false
+}
+
+if (fixtureSupported) {
+  integTest.dependsOn hdfsFixture
+  integTestSecure.dependsOn secureHdfsFixture
+} else {
+  integTest.enabled = false
+  integTestSecure.enabled = false
+  if (legalPath) {
+    logger.warn("hdfsFixture unsupported, please set HADOOP_HOME and put HADOOP_HOME\\bin in PATH")
+  } else {
+    logger.warn("hdfsFixture unsupported since there are spaces in the path: '" + rootProject.rootDir.toString() + "'")
+  }
+}

+ 38 - 0
x-pack/plugin/searchable-snapshots/qa/hdfs/src/test/java/org/elasticsearch/xpack/searchablesnapshots/hdfs/HdfsSearchableSnapshotsIT.java

@@ -0,0 +1,38 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.searchablesnapshots.hdfs;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsRestTestCase;
+
+import static org.hamcrest.Matchers.blankOrNullString;
+import static org.hamcrest.Matchers.not;
+
+public class HdfsSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTestCase {
+    @Override
+    protected String repositoryType() {
+        return "hdfs";
+    }
+
+    @Override
+    protected Settings repositorySettings() {
+        final String uri = System.getProperty("test.hdfs.uri");
+        assertThat(uri, not(blankOrNullString()));
+
+        final String path = System.getProperty("test.hdfs.path");
+        assertThat(path, not(blankOrNullString()));
+
+        // Optional based on type of test
+        final String principal = System.getProperty("test.krb5.principal.es");
+
+        Settings.Builder repositorySettings = Settings.builder().put("client", "searchable_snapshots").put("uri", uri).put("path", path);
+        if (principal != null) {
+            repositorySettings.put("security.principal", principal);
+        }
+        return repositorySettings.build();
+    }
+}