Przeglądaj źródła

HDFS plugin: add replication_factor param (#94132)

Allows users of the HDFS repository plugin to configure the replication factor for files created by the repository implementation at repository creation time.

---------

Co-authored-by: James Baiera <james.baiera@gmail.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Piotr Stankowski 2 lat temu
rodzic
commit
a462deaccf

+ 5 - 0
docs/changelog/94132.yaml

@@ -0,0 +1,5 @@
+pr: 94132
+summary: HDFS plugin add replication_factor param
+area: Snapshot/Restore
+type: enhancement
+issues: []

+ 6 - 0
docs/plugins/repository-hdfs.asciidoc

@@ -77,6 +77,12 @@ include::repository-shared-settings.asciidoc[]
     the pattern with the hostname of the node at runtime (see
     link:repository-hdfs-security-runtime[Creating the Secure Repository]).
 
+`replication_factor`::
+
+    The replication factor for all new HDFS files created by this repository.
+    Must be greater or equal to `dfs.replication.min` and less or equal to `dfs.replication.max` HDFS option.
+    Defaults to using HDFS cluster setting.
+
 [[repository-hdfs-availability]]
 [discrete]
 ===== A note on HDFS availability

+ 38 - 6
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

@@ -48,12 +48,24 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
     private final Path path;
     private final int bufferSize;
 
-    HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore store, Path path, int bufferSize, HdfsSecurityContext hdfsSecurityContext) {
+    private final Short replicationFactor;
+    private final Options.CreateOpts[] createOpts;
+
+    HdfsBlobContainer(
+        BlobPath blobPath,
+        HdfsBlobStore store,
+        Path path,
+        int bufferSize,
+        HdfsSecurityContext hdfsSecurityContext,
+        Short replicationFactor
+    ) {
         super(blobPath);
         this.store = store;
         this.securityContext = hdfsSecurityContext;
         this.path = path;
         this.bufferSize = bufferSize;
+        this.replicationFactor = replicationFactor;
+        this.createOpts = replicationFactor == null ? new CreateOpts[0] : new CreateOpts[] { CreateOpts.repFac(replicationFactor) };
     }
 
     // TODO: See if we can get precise result reporting.
@@ -174,7 +186,11 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
             store.execute((Operation<Void>) fileContext -> {
                 try {
                     try (
-                        FSDataOutputStream stream = fileContext.create(tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK))
+                        FSDataOutputStream stream = fileContext.create(
+                            tempBlobPath,
+                            EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK),
+                            createOpts
+                        )
                     ) {
                         writer.accept(stream);
                     }
@@ -191,7 +207,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
                 ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
                 : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
             store.execute((Operation<Void>) fileContext -> {
-                try (FSDataOutputStream stream = fileContext.create(blob, flags)) {
+                try (FSDataOutputStream stream = fileContext.create(blob, flags, createOpts)) {
                     writer.accept(stream);
                 } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
                     throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
@@ -219,7 +235,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
 
     private void writeToPath(BytesReference bytes, Path blobPath, FileContext fileContext, EnumSet<CreateFlag> createFlags)
         throws IOException {
-        try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags)) {
+        try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags, createOpts)) {
             bytes.writeTo(stream);
         }
     }
@@ -232,7 +248,9 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
         EnumSet<CreateFlag> createFlags
     ) throws IOException {
         final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize];
-        try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags, CreateOpts.bufferSize(buffer.length))) {
+
+        Options.CreateOpts[] createOptsWithBufferSize = addOptionToArray(createOpts, CreateOpts.bufferSize(buffer.length));
+        try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags, createOptsWithBufferSize)) {
             int bytesRead;
             while ((bytesRead = inputStream.read(buffer)) != -1) {
                 stream.write(buffer, 0, bytesRead);
@@ -271,7 +289,10 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
         for (FileStatus file : files) {
             if (file.isDirectory()) {
                 final String name = file.getPath().getName();
-                map.put(name, new HdfsBlobContainer(path().add(name), store, new Path(path, name), bufferSize, securityContext));
+                map.put(
+                    name,
+                    new HdfsBlobContainer(path().add(name), store, new Path(path, name), bufferSize, securityContext, replicationFactor)
+                );
             }
         }
         return Collections.unmodifiableMap(map);
@@ -328,4 +349,15 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
     ) {
         listener.onFailure(new UnsupportedOperationException("HDFS repositories do not support this operation"));
     }
+
+    private static CreateOpts[] addOptionToArray(final CreateOpts[] opts, final CreateOpts opt) {
+        if (opts == null) {
+            return new CreateOpts[] { opt };
+        }
+        CreateOpts[] newOpts = new CreateOpts[opts.length + 1];
+        System.arraycopy(opts, 0, newOpts, 0, opts.length);
+        newOpts[opts.length] = opt;
+
+        return newOpts;
+    }
 }

+ 6 - 3
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java

@@ -25,18 +25,21 @@ final class HdfsBlobStore implements BlobStore {
     private final HdfsSecurityContext securityContext;
     private final int bufferSize;
     private final boolean readOnly;
+    private final Short replicationFactor;
     private volatile boolean closed;
 
     HdfsBlobStore(FileContext fileContext, String path, int bufferSize, boolean readOnly) throws IOException {
-        this(fileContext, path, bufferSize, readOnly, false);
+        this(fileContext, path, bufferSize, readOnly, false, null);
     }
 
-    HdfsBlobStore(FileContext fileContext, String path, int bufferSize, boolean readOnly, boolean haEnabled) throws IOException {
+    HdfsBlobStore(FileContext fileContext, String path, int bufferSize, boolean readOnly, boolean haEnabled, Short replicationFactor)
+        throws IOException {
         this.fileContext = fileContext;
         // Only restrict permissions if not running with HA
         boolean restrictPermissions = (haEnabled == false);
         this.securityContext = new HdfsSecurityContext(fileContext.getUgi(), restrictPermissions);
         this.bufferSize = bufferSize;
+        this.replicationFactor = replicationFactor;
         this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
         this.readOnly = readOnly;
         if (readOnly == false) {
@@ -63,7 +66,7 @@ final class HdfsBlobStore implements BlobStore {
 
     @Override
     public BlobContainer blobContainer(BlobPath path) {
-        return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext);
+        return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext, replicationFactor);
     }
 
     private Path buildHdfsPath(BlobPath blobPath) {

+ 48 - 1
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java

@@ -29,6 +29,7 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
+import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 
@@ -43,6 +44,9 @@ import java.util.Locale;
 
 public final class HdfsRepository extends BlobStoreRepository {
 
+    private static final int MIN_REPLICATION_FACTOR = 1;
+    private static final int MAX_REPLICATION_FACTOR = Short.MAX_VALUE;
+
     private static final Logger logger = LogManager.getLogger(HdfsRepository.class);
 
     private static final String CONF_SECURITY_PRINCIPAL = "security.principal";
@@ -109,6 +113,42 @@ public final class HdfsRepository extends BlobStoreRepository {
             hadoopConfiguration.set(key, confSettings.get(key));
         }
 
+        Integer replicationFactor = repositorySettings.getAsInt("replication_factor", null);
+        if (replicationFactor != null && replicationFactor < MIN_REPLICATION_FACTOR) {
+            throw new RepositoryException(
+                metadata.name(),
+                "Value of replication_factor [{}] must be >= {}",
+                replicationFactor,
+                MIN_REPLICATION_FACTOR
+            );
+        }
+        if (replicationFactor != null && replicationFactor > MAX_REPLICATION_FACTOR) {
+            throw new RepositoryException(
+                metadata.name(),
+                "Value of replication_factor [{}] must be <= {}",
+                replicationFactor,
+                MAX_REPLICATION_FACTOR
+            );
+        }
+        int minReplicationFactory = hadoopConfiguration.getInt("dfs.replication.min", 0);
+        int maxReplicationFactory = hadoopConfiguration.getInt("dfs.replication.max", 512);
+        if (replicationFactor != null && replicationFactor < minReplicationFactory) {
+            throw new RepositoryException(
+                metadata.name(),
+                "Value of replication_factor [{}] must be >= dfs.replication.min [{}]",
+                replicationFactor,
+                minReplicationFactory
+            );
+        }
+        if (replicationFactor != null && replicationFactor > maxReplicationFactory) {
+            throw new RepositoryException(
+                metadata.name(),
+                "Value of replication_factor [{}] must be <= dfs.replication.max [{}]",
+                replicationFactor,
+                maxReplicationFactory
+            );
+        }
+
         // Disable FS cache
         hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true);
 
@@ -142,7 +182,14 @@ public final class HdfsRepository extends BlobStoreRepository {
         );
 
         try {
-            return new HdfsBlobStore(fileContext, path, bufferSize, isReadOnly(), haEnabled);
+            return new HdfsBlobStore(
+                fileContext,
+                path,
+                bufferSize,
+                isReadOnly(),
+                haEnabled,
+                replicationFactor != null ? replicationFactor.shortValue() : null
+            );
         } catch (IOException e) {
             throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", blobstoreUri), e);
         }

+ 30 - 0
plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java

@@ -13,8 +13,11 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -22,6 +25,8 @@ import org.elasticsearch.core.Streams;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.test.ESTestCase;
 import org.hamcrest.CoreMatchers;
+import org.mockito.AdditionalMatchers;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -91,6 +96,7 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
         return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
             try {
                 TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
+                fs = Mockito.spy(fs);
                 return FileContext.getFileContext(fs, cfg);
             } catch (UnsupportedFileSystemException e) {
                 throw new RuntimeException(e);
@@ -158,6 +164,30 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
         assertTrue(container.blobExists("foo"));
     }
 
+    public void testReplicationFactor() throws Exception {
+        FileContext fileContext = createTestContext();
+        short replicationFactor = 8;
+
+        HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, false, false, replicationFactor);
+        BlobContainer container = hdfsBlobStore.blobContainer(BlobPath.EMPTY.add("path"));
+
+        writeBlob(container, "foo", new BytesArray("test"), false);
+
+        // Verify that the right replicationFactor was applied.
+        Mockito.verify(fileContext.getDefaultFileSystem(), Mockito.atLeastOnce())
+            .createInternal(
+                Mockito.any(Path.class),
+                AdditionalMatchers.or(Mockito.isNull(), Mockito.any()),
+                Mockito.nullable(FsPermission.class),
+                Mockito.anyInt(),
+                Mockito.eq(replicationFactor),
+                Mockito.anyLong(),
+                Mockito.nullable(Progressable.class),
+                Mockito.nullable(Options.ChecksumOpt.class),
+                Mockito.anyBoolean()
+            );
+    }
+
     public void testListBlobsByPrefix() throws Exception {
         FileContext fileContext = createTestContext();
         HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, false);

+ 74 - 0
plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java

@@ -203,6 +203,80 @@ public class HdfsTests extends ESSingleNodeTestCase {
         }
     }
 
+    public void testReplicationFactorBelowOne() {
+        try {
+            client().admin()
+                .cluster()
+                .preparePutRepository("test-repo")
+                .setType("hdfs")
+                .setSettings(Settings.builder().put("uri", "hdfs:///").put("replication_factor", "0").put("path", "foo").build())
+                .get();
+            fail();
+        } catch (RepositoryException e) {
+            assertTrue(e.getCause() instanceof RepositoryException);
+            assertTrue(e.getCause().getMessage().contains("Value of replication_factor [0] must be >= 1"));
+        }
+    }
+
+    public void testReplicationFactorOverMaxShort() {
+        try {
+            client().admin()
+                .cluster()
+                .preparePutRepository("test-repo")
+                .setType("hdfs")
+                .setSettings(Settings.builder().put("uri", "hdfs:///").put("replication_factor", "32768").put("path", "foo").build())
+                .get();
+            fail();
+        } catch (RepositoryException e) {
+            assertTrue(e.getCause() instanceof RepositoryException);
+            assertTrue(e.getCause().getMessage().contains("Value of replication_factor [32768] must be <= 32767"));
+        }
+    }
+
+    public void testReplicationFactorBelowReplicationMin() {
+        try {
+            client().admin()
+                .cluster()
+                .preparePutRepository("test-repo")
+                .setType("hdfs")
+                .setSettings(
+                    Settings.builder()
+                        .put("uri", "hdfs:///")
+                        .put("replication_factor", "4")
+                        .put("path", "foo")
+                        .put("conf.dfs.replication.min", "5")
+                        .build()
+                )
+                .get();
+            fail();
+        } catch (RepositoryException e) {
+            assertTrue(e.getCause() instanceof RepositoryException);
+            assertTrue(e.getCause().getMessage().contains("Value of replication_factor [4] must be >= dfs.replication.min [5]"));
+        }
+    }
+
+    public void testReplicationFactorOverReplicationMax() {
+        try {
+            client().admin()
+                .cluster()
+                .preparePutRepository("test-repo")
+                .setType("hdfs")
+                .setSettings(
+                    Settings.builder()
+                        .put("uri", "hdfs:///")
+                        .put("replication_factor", "600")
+                        .put("path", "foo")
+                        .put("conf.dfs.replication.max", "512")
+                        .build()
+                )
+                .get();
+            fail();
+        } catch (RepositoryException e) {
+            assertTrue(e.getCause() instanceof RepositoryException);
+            assertTrue(e.getCause().getMessage().contains("Value of replication_factor [600] must be <= dfs.replication.max [512]"));
+        }
+    }
+
     private long count(Client client, String index) {
         return client.prepareSearch(index).setSize(0).get().getHits().getTotalHits().value;
     }

+ 26 - 4
server/src/main/java/org/elasticsearch/repositories/RepositoryException.java

@@ -20,12 +20,34 @@ import java.io.IOException;
 public class RepositoryException extends ElasticsearchException {
     private final String repository;
 
-    public RepositoryException(String repository, String msg) {
-        this(repository, msg, null);
+    /**
+     * Construct a <code>RepositoryException</code> with the specified detail message.
+     *
+     * The message can be parameterized using <code>{}</code> as placeholders for the given
+     * arguments.
+     *
+     * @param repository the repository name
+     * @param msg        the detail message
+     * @param args       the arguments for the message
+     */
+    public RepositoryException(String repository, String msg, Object... args) {
+        this(repository, msg, (Throwable) null, args);
     }
 
-    public RepositoryException(String repository, String msg, Throwable cause) {
-        super("[" + (repository == null ? "_na" : repository) + "] " + msg, cause);
+    /**
+     * Construct a <code>RepositoryException</code> with the specified detail message
+     * and nested exception.
+     *
+     * The message can be parameterized using <code>{}</code> as placeholders for the given
+     * arguments.
+     *
+     * @param repository the repository name
+     * @param msg        the detail message
+     * @param cause      the nested exception
+     * @param args       the arguments for the message
+     */
+    public RepositoryException(String repository, String msg, Throwable cause, Object... args) {
+        super("[" + (repository == null ? "_na" : repository) + "] " + msg, cause, args);
         this.repository = repository;
     }