|
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
* A directory implementation that uses the Elasticsearch {@link Distributor} abstraction to distribute
|
|
|
* files across multiple data directories.
|
|
|
*/
|
|
|
-public final class DistributorDirectory extends BaseDirectory {
|
|
|
+public final class DistributorDirectory extends Directory {
|
|
|
|
|
|
private final Distributor distributor;
|
|
|
private final HashMap<String, Directory> nameDirMapping = new HashMap<>();
|
|
@@ -74,7 +74,6 @@ public final class DistributorDirectory extends BaseDirectory {
|
|
|
nameDirMapping.put(file, dir);
|
|
|
}
|
|
|
}
|
|
|
- lockFactory = new DistributorLockFactoryWrapper(distributor.primary());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -132,7 +131,6 @@ public final class DistributorDirectory extends BaseDirectory {
|
|
|
} finally {
|
|
|
IOUtils.close(distributor.all());
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -140,7 +138,7 @@ public final class DistributorDirectory extends BaseDirectory {
|
|
|
*
|
|
|
* @throws IOException if the name has not yet been associated with any directory ie. fi the file does not exists
|
|
|
*/
|
|
|
- Directory getDirectory(String name) throws IOException { // pkg private for testing
|
|
|
+ synchronized Directory getDirectory(String name) throws IOException { // pkg private for testing
|
|
|
return getDirectory(name, true);
|
|
|
}
|
|
|
|
|
@@ -148,7 +146,7 @@ public final class DistributorDirectory extends BaseDirectory {
|
|
|
* Returns the directory that has previously been associated with this file name or associates the name with a directory
|
|
|
* if failIfNotAssociated is set to false.
|
|
|
*/
|
|
|
- private Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException {
|
|
|
+ private synchronized Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException {
|
|
|
final Directory directory = nameDirMapping.get(name);
|
|
|
if (directory == null) {
|
|
|
if (failIfNotAssociated) {
|
|
@@ -164,17 +162,6 @@ public final class DistributorDirectory extends BaseDirectory {
|
|
|
return directory;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public synchronized void setLockFactory(LockFactory lockFactory) throws IOException {
|
|
|
- distributor.primary().setLockFactory(lockFactory);
|
|
|
- super.setLockFactory(new DistributorLockFactoryWrapper(distributor.primary()));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized String getLockID() {
|
|
|
- return distributor.primary().getLockID();
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public synchronized String toString() {
|
|
|
return distributor.toString();
|
|
@@ -201,8 +188,8 @@ public final class DistributorDirectory extends BaseDirectory {
|
|
|
.append(System.lineSeparator());
|
|
|
} else if (directory != d) {
|
|
|
consistent = false;
|
|
|
- builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
|
|
|
- .append(" but exists in another distributor directory").append(d)
|
|
|
+ builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
|
|
|
+ .append(" but exists in another distributor directory ").append(d)
|
|
|
.append(System.lineSeparator());
|
|
|
}
|
|
|
|
|
@@ -212,86 +199,41 @@ public final class DistributorDirectory extends BaseDirectory {
|
|
|
return consistent; // return boolean so it can be easily be used in asserts
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This inner class is a simple wrapper around the original
|
|
|
- * lock factory to track files written / created through the
|
|
|
- * lock factory. For instance {@link NativeFSLockFactory} creates real
|
|
|
- * files that we should expose for consistency reasons.
|
|
|
- */
|
|
|
- private class DistributorLockFactoryWrapper extends LockFactory {
|
|
|
- private final Directory dir;
|
|
|
- private final LockFactory delegate;
|
|
|
- private final boolean writesFiles;
|
|
|
-
|
|
|
- public DistributorLockFactoryWrapper(Directory dir) {
|
|
|
- this.dir = dir;
|
|
|
- final FSDirectory leaf = DirectoryUtils.getLeaf(dir, FSDirectory.class);
|
|
|
- if (leaf != null) {
|
|
|
- writesFiles = leaf.getLockFactory() instanceof FSLockFactory;
|
|
|
- } else {
|
|
|
- writesFiles = false;
|
|
|
- }
|
|
|
- this.delegate = dir.getLockFactory();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void setLockPrefix(String lockPrefix) {
|
|
|
- delegate.setLockPrefix(lockPrefix);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getLockPrefix() {
|
|
|
- return delegate.getLockPrefix();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Lock makeLock(String lockName) {
|
|
|
- return new DistributorLock(delegate.makeLock(lockName), lockName);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void clearLock(String lockName) throws IOException {
|
|
|
- delegate.clearLock(lockName);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- return "DistributorLockFactoryWrapper(" + delegate.toString() + ")";
|
|
|
- }
|
|
|
-
|
|
|
- private class DistributorLock extends Lock {
|
|
|
- private final Lock delegateLock;
|
|
|
- private final String name;
|
|
|
-
|
|
|
- DistributorLock(Lock delegate, String name) {
|
|
|
- this.delegateLock = delegate;
|
|
|
- this.name = name;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean obtain() throws IOException {
|
|
|
- if (delegateLock.obtain()) {
|
|
|
- if (writesFiles) {
|
|
|
- synchronized (DistributorDirectory.this) {
|
|
|
- assert (nameDirMapping.containsKey(name) == false || nameDirMapping.get(name) == dir);
|
|
|
- if (nameDirMapping.get(name) == null) {
|
|
|
- nameDirMapping.put(name, dir);
|
|
|
+ @Override
|
|
|
+ public Lock makeLock(final String lockName) {
|
|
|
+ final Directory primary = distributor.primary();
|
|
|
+ final Lock delegateLock = primary.makeLock(lockName);
|
|
|
+ if (DirectoryUtils.getLeaf(primary, FSDirectory.class) != null) {
|
|
|
+ // Wrap the delegate's lock just so we can monitor when it actually wrote a lock file. We assume that an FSDirectory writes its
|
|
|
+ // locks as actual files (we don't support NoLockFactory):
|
|
|
+ return new Lock() {
|
|
|
+ @Override
|
|
|
+ public boolean obtain() throws IOException {
|
|
|
+ if (delegateLock.obtain()) {
|
|
|
+ synchronized(DistributorDirectory.this) {
|
|
|
+ assert nameDirMapping.containsKey(lockName) == false || nameDirMapping.get(lockName) == primary;
|
|
|
+ if (nameDirMapping.get(lockName) == null) {
|
|
|
+ nameDirMapping.put(lockName, primary);
|
|
|
}
|
|
|
}
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
}
|
|
|
- return true;
|
|
|
- } else {
|
|
|
- return false;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public void close() throws IOException { delegateLock.close(); }
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ delegateLock.close();
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public boolean isLocked() throws IOException {
|
|
|
- return delegateLock.isLocked();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public boolean isLocked() throws IOException {
|
|
|
+ return delegateLock.isLocked();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ } else {
|
|
|
+ return delegateLock;
|
|
|
}
|
|
|
}
|
|
|
}
|