Przeglądaj źródła

Merge pull request #13794 from s1monw/kill_es_deletion_policy

Remove ES internal deletion policies in favour of Lucenes implementations
Simon Willnauer 10 lat temu
rodzic
commit
2a94085605
24 zmienionych plików z 64 dodań i 907 usunięć
  1. 0 3
      core/src/main/java/org/elasticsearch/index/IndexService.java
  2. 0 60
      core/src/main/java/org/elasticsearch/index/deletionpolicy/AbstractESDeletionPolicy.java
  3. 0 38
      core/src/main/java/org/elasticsearch/index/deletionpolicy/DeletionPolicyModule.java
  4. 0 63
      core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepLastNDeletionPolicy.java
  5. 0 65
      core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepOnlyLastDeletionPolicy.java
  6. 0 220
      core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicy.java
  7. 0 74
      core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java
  8. 0 55
      core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommits.java
  9. 2 14
      core/src/main/java/org/elasticsearch/index/engine/Engine.java
  10. 2 2
      core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
  11. 2 14
      core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  12. 4 2
      core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
  13. 21 7
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  14. 2 4
      core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
  15. 2 2
      core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java
  16. 9 16
      core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java
  17. 10 7
      core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
  18. 4 1
      core/src/main/java/org/elasticsearch/repositories/Repository.java
  19. 3 3
      core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
  20. 0 180
      core/src/test/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicyTests.java
  21. 0 58
      core/src/test/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommitExistsMatcher.java
  22. 1 8
      core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  23. 1 8
      core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
  24. 1 3
      core/src/test/java/org/elasticsearch/index/store/StoreTests.java

+ 0 - 3
core/src/main/java/org/elasticsearch/index/IndexService.java

@@ -40,7 +40,6 @@ import org.elasticsearch.index.aliases.IndexAliasesService;
 import org.elasticsearch.index.analysis.AnalysisService;
 import org.elasticsearch.index.cache.IndexCache;
 import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
-import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
 import org.elasticsearch.index.fielddata.FieldDataType;
 import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
@@ -365,8 +364,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
                             injector.getInstance(IndicesQueryCache.class).onClose(shardId);
                         }
                     }), path));
-            modules.add(new DeletionPolicyModule());
-
             pluginsService.processModules(modules);
 
             try {

+ 0 - 60
core/src/main/java/org/elasticsearch/index/deletionpolicy/AbstractESDeletionPolicy.java

@@ -1,60 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.shard.IndexShardComponent;
-import org.elasticsearch.index.shard.ShardId;
-
-abstract class AbstractESDeletionPolicy extends IndexDeletionPolicy implements IndexShardComponent {
-
-    protected final ESLogger logger;
-
-    protected final ShardId shardId;
-
-    protected final Settings indexSettings;
-
-    protected AbstractESDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) {
-        this.shardId = shardId;
-        this.indexSettings = indexSettings;
-        this.logger = Loggers.getLogger(getClass(), indexSettings, shardId);
-    }
-
-    @Override
-    public ShardId shardId() {
-        return this.shardId;
-    }
-
-    @Override
-    public Settings indexSettings() {
-        return this.indexSettings;
-    }
-
-    public String nodeName() {
-        return indexSettings.get("name", "");
-    }
-
-    
-
-}

+ 0 - 38
core/src/main/java/org/elasticsearch/index/deletionpolicy/DeletionPolicyModule.java

@@ -1,38 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.elasticsearch.common.inject.AbstractModule;
-import org.elasticsearch.common.inject.name.Names;
-
-public class DeletionPolicyModule extends AbstractModule {
-
-    @Override
-    protected void configure() {
-        bind(IndexDeletionPolicy.class)
-                .annotatedWith(Names.named("actual"))
-                .to(KeepOnlyLastDeletionPolicy.class)
-                .asEagerSingleton();
-
-        bind(SnapshotDeletionPolicy.class)
-                .asEagerSingleton();
-    }
-}

+ 0 - 63
core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepLastNDeletionPolicy.java

@@ -1,63 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.apache.lucene.index.IndexCommit;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- *
- */
-public class KeepLastNDeletionPolicy extends AbstractESDeletionPolicy {
-
-    private final int numToKeep;
-
-    @Inject
-    public KeepLastNDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) {
-        super(shardId, indexSettings);
-        this.numToKeep = indexSettings.getAsInt("index.deletionpolicy.num_to_keep", 5);
-        logger.debug("Using [keep_last_n] deletion policy with num_to_keep[{}]", numToKeep);
-    }
-
-    @Override
-    public void onInit(List<? extends IndexCommit> commits) throws IOException {
-        // do no deletions on init
-        doDeletes(commits);
-    }
-
-    @Override
-    public void onCommit(List<? extends IndexCommit> commits) throws IOException {
-        doDeletes(commits);
-    }
-
-    private void doDeletes(List<? extends IndexCommit> commits) {
-        int size = commits.size();
-        for (int i = 0; i < size - numToKeep; i++) {
-            commits.get(i).delete();
-        }
-    }
-
-}

+ 0 - 65
core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepOnlyLastDeletionPolicy.java

@@ -1,65 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.apache.lucene.index.IndexCommit;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.util.List;
-
-/**
- * This {@link org.apache.lucene.index.IndexDeletionPolicy} implementation that
- * keeps only the most recent commit and immediately removes
- * all prior commits after a new commit is done.  This is
- * the default deletion policy.
- */
-public class KeepOnlyLastDeletionPolicy extends AbstractESDeletionPolicy {
-
-    @Inject
-    public KeepOnlyLastDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) {
-        super(shardId, indexSettings);
-        logger.debug("Using [keep_only_last] deletion policy");
-    }
-
-    /**
-     * Deletes all commits except the most recent one.
-     */
-    @Override
-    public void onInit(List<? extends IndexCommit> commits) {
-        // Note that commits.size() should normally be 1:
-        onCommit(commits);
-    }
-
-    /**
-     * Deletes all commits except the most recent one.
-     */
-    @Override
-    public void onCommit(List<? extends IndexCommit> commits) {
-        // Note that commits.size() should normally be 2 (if not
-        // called by onInit above):
-        int size = commits.size();
-        for (int i = 0; i < size - 1; i++) {
-            commits.get(i).delete();
-        }
-    }
-}

+ 0 - 220
core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicy.java

@@ -1,220 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.inject.name.Named;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.index.shard.IndexShardComponent;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Snapshot deletion policy allows to get snapshots of an index state (last commit or all commits)
- * and if the deletion policy is used with all open index writers (JVM level) then the snapshot
- * state will not be deleted until it will be released.
- *
- *
- */
-public class SnapshotDeletionPolicy extends AbstractESDeletionPolicy {
-
-    private final IndexDeletionPolicy primary;
-
-    private final ConcurrentMap<Long, SnapshotHolder> snapshots = ConcurrentCollections.newConcurrentMap();
-
-    private volatile List<SnapshotIndexCommit> commits;
-
-    private final Object mutex = new Object();
-
-    private SnapshotIndexCommit lastCommit;
-
-    /**
-     * Constructs a new snapshot deletion policy that wraps the provided deletion policy.
-     */
-    @Inject
-    public SnapshotDeletionPolicy(@Named("actual") IndexDeletionPolicy primary) {
-        super(((IndexShardComponent) primary).shardId(), ((IndexShardComponent) primary).indexSettings());
-        this.primary = primary;
-    }
-
-    /**
-     * Called by Lucene. Same as {@link #onCommit(java.util.List)}.
-     */
-    @Override
-    public void onInit(List<? extends IndexCommit> commits) throws IOException {
-        if (!commits.isEmpty()) { // this might be empty if we create a new index. 
-            // the behavior has changed in Lucene 4.4 that calls onInit even with an empty commits list.
-            onCommit(commits);
-        }
-    }
-
-    /**
-     * Called by Lucene.. Wraps the provided commits with {@link SnapshotIndexCommit}
-     * and delegates to the wrapped deletion policy.
-     */
-    @Override
-    public void onCommit(List<? extends IndexCommit> commits) throws IOException {
-        assert !commits.isEmpty() : "Commits must not be empty";
-        synchronized (mutex) {
-            List<SnapshotIndexCommit> snapshotCommits = wrapCommits(commits);
-            primary.onCommit(snapshotCommits);
-
-            // clean snapshots that their respective counts are 0 (should not really happen)
-            for (Iterator<SnapshotHolder> it = snapshots.values().iterator(); it.hasNext(); ) {
-                SnapshotHolder holder = it.next();
-                if (holder.counter <= 0) {
-                    it.remove();
-                }
-            }
-            // build the current commits list (all the ones that are not deleted by the primary)
-            List<SnapshotIndexCommit> newCommits = new ArrayList<>();
-            for (SnapshotIndexCommit commit : snapshotCommits) {
-                if (!commit.isDeleted()) {
-                    newCommits.add(commit);
-                }
-            }
-            this.commits = newCommits;
-            // the last commit that is not deleted
-            this.lastCommit = newCommits.get(newCommits.size() - 1);     
-           
-        }
-    }
-
-    /**
-     * Snapshots all the current commits in the index. Make sure to call
-     * {@link SnapshotIndexCommits#close()} to release it.
-     */
-    public SnapshotIndexCommits snapshots() throws IOException {
-        synchronized (mutex) {
-            if (snapshots == null) {
-                throw new IllegalStateException("Snapshot deletion policy has not been init yet...");
-            }
-            List<SnapshotIndexCommit> result = new ArrayList<>(commits.size());
-            for (SnapshotIndexCommit commit : commits) {
-                result.add(snapshot(commit));
-            }
-            return new SnapshotIndexCommits(result);
-        }
-    }
-
-    /**
-     * Returns a snapshot of the index (for the last commit point). Make
-     * sure to call {@link SnapshotIndexCommit#close()} in order to release it.
-     */
-    public SnapshotIndexCommit snapshot() throws IOException {
-        synchronized (mutex) {
-            if (lastCommit == null) {
-                throw new IllegalStateException("Snapshot deletion policy has not been init yet...");
-            }
-            return snapshot(lastCommit);
-        }
-    }
-
-    @Override
-    public IndexDeletionPolicy clone() {
-       // Lucene IW makes a clone internally but since we hold on to this instance 
-       // the clone will just be the identity. See InternalEngine recovery why we need this.
-       return this;
-    }
-
-    /**
-     * Helper method to snapshot a give commit.
-     */
-    private SnapshotIndexCommit snapshot(SnapshotIndexCommit commit) throws IOException {
-        SnapshotHolder snapshotHolder = snapshots.get(commit.getGeneration());
-        if (snapshotHolder == null) {
-            snapshotHolder = new SnapshotHolder(0);
-            snapshots.put(commit.getGeneration(), snapshotHolder);
-        }
-        snapshotHolder.counter++;
-        return new OneTimeReleaseSnapshotIndexCommit(this, commit);
-    }
-
-    /**
-     * Returns <tt>true</tt> if the version has been snapshotted.
-     */
-    boolean isHeld(long version) {
-        SnapshotDeletionPolicy.SnapshotHolder holder = snapshots.get(version);
-        return holder != null && holder.counter > 0;
-    }
-
-    /**
-     * Releases the version provided. Returns <tt>true</tt> if the release was successful.
-     */
-    boolean close(long version) {
-        synchronized (mutex) {
-            SnapshotDeletionPolicy.SnapshotHolder holder = snapshots.get(version);
-            if (holder == null) {
-                return false;
-            }
-            if (holder.counter <= 0) {
-                snapshots.remove(version);
-                return false;
-            }
-            if (--holder.counter == 0) {
-                snapshots.remove(version);
-            }
-            return true;
-        }
-    }
-
-    /**
-     * A class that wraps an {@link SnapshotIndexCommit} and makes sure that release will only
-     * be called once on it.
-     */
-    private static class OneTimeReleaseSnapshotIndexCommit extends SnapshotIndexCommit {
-        private volatile boolean released = false;
-
-        OneTimeReleaseSnapshotIndexCommit(SnapshotDeletionPolicy deletionPolicy, IndexCommit cp) throws IOException {
-            super(deletionPolicy, cp);
-        }
-
-        @Override
-        public void close() {
-            if (released) {
-                return;
-            }
-            released = true;
-            ((SnapshotIndexCommit) delegate).close();
-        }
-    }
-
-    private static class SnapshotHolder {
-        int counter;
-
-        private SnapshotHolder(int counter) {
-            this.counter = counter;
-        }
-    }
-
-    private List<SnapshotIndexCommit> wrapCommits(List<? extends IndexCommit> commits) throws IOException {
-        final int count = commits.size();
-        List<SnapshotIndexCommit> snapshotCommits = new ArrayList<>(count);
-        for (int i = 0; i < count; i++)
-            snapshotCommits.add(new SnapshotIndexCommit(this, commits.get(i)));
-        return snapshotCommits;
-    }
-}

+ 0 - 74
core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java

@@ -1,74 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.apache.lucene.index.IndexCommit;
-import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.lucene.IndexCommitDelegate;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-/**
- * A snapshot index commit point. While this is held and {@link #close()}
- * was not called, no files will be deleted that relates to this commit point
- * ({@link #getFileNames()}).
- *
- *
- */
-public class SnapshotIndexCommit extends IndexCommitDelegate implements Releasable {
-
-    private final SnapshotDeletionPolicy deletionPolicy;
-
-    private final String[] files;
-
-    SnapshotIndexCommit(SnapshotDeletionPolicy deletionPolicy, IndexCommit cp) throws IOException {
-        super(cp);
-        this.deletionPolicy = deletionPolicy;
-        ArrayList<String> tmpFiles = new ArrayList<>();
-        for (String o : cp.getFileNames()) {
-            tmpFiles.add(o);
-        }
-        files = tmpFiles.toArray(new String[tmpFiles.size()]);
-    }
-
-    public String[] getFiles() {
-        return files;
-    }
-
-    /**
-     * Releases the current snapshot.
-     */
-    @Override
-    public void close() {
-        deletionPolicy.close(getGeneration());
-    }
-
-    /**
-     * Override the delete operation, and only actually delete it if it
-     * is not held by the {@link SnapshotDeletionPolicy}.
-     */
-    @Override
-    public void delete() {
-        if (!deletionPolicy.isHeld(getGeneration())) {
-            delegate.delete();
-        }
-    }
-}

+ 0 - 55
core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommits.java

@@ -1,55 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.lease.Releasables;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Represents a snapshot view of several commits. Provides a way to iterate over
- * them as well as a simple method to release all of them.
- *
- *
- */
-public class SnapshotIndexCommits implements Iterable<SnapshotIndexCommit>, Releasable {
-
-    private final List<SnapshotIndexCommit> commits;
-
-    public SnapshotIndexCommits(List<SnapshotIndexCommit> commits) {
-        this.commits = commits;
-    }
-
-    public int size() {
-        return commits.size();
-    }
-
-    @Override
-    public Iterator<SnapshotIndexCommit> iterator() {
-        return commits.iterator();
-    }
-
-    @Override
-    public void close() {
-        Releasables.close(commits);
-    }
-}

+ 2 - 14
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -19,17 +19,7 @@
 
 package org.elasticsearch.index.engine;
 
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.FilterLeafReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.LeafReader;
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.index.SegmentCommitInfo;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.index.Term;
+import org.apache.lucene.index.*;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SearcherManager;
@@ -51,8 +41,6 @@ import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.mapper.ParseContext.Document;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.Uid;
@@ -516,7 +504,7 @@ public abstract class Engine implements Closeable {
      *
      * @param flushFirst indicates whether the engine should flush before returning the snapshot
      */
-    public abstract SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
+    public abstract IndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
 
     /**
      * fail engine due to some error. the engine will also be closed.

+ 2 - 2
core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
 import org.apache.lucene.search.QueryCache;
 import org.apache.lucene.search.QueryCachingPolicy;
 import org.apache.lucene.search.similarities.Similarity;
@@ -30,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
 import org.elasticsearch.index.indexing.ShardIndexingService;
 import org.elasticsearch.index.shard.MergeSchedulerConfig;
 import org.elasticsearch.index.shard.ShardId;
@@ -305,7 +305,7 @@ public final class EngineConfig {
     }
 
     /**
-     * Returns a {@link org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy} used in the engines
+     * Returns a {@link SnapshotDeletionPolicy} used in the engines
      * {@link org.apache.lucene.index.IndexWriter}.
      */
     public SnapshotDeletionPolicy getDeletionPolicy() {

+ 2 - 14
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -19,19 +19,8 @@
 
 package org.elasticsearch.index.engine;
 
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.*;
 import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.LeafReader;
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.index.LiveIndexWriterConfig;
-import org.apache.lucene.index.MergePolicy;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.SegmentCommitInfo;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.IndexSearcher;
@@ -59,7 +48,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.indexing.ShardIndexingService;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.merge.MergeStats;
@@ -885,7 +873,7 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public SnapshotIndexCommit snapshotIndex(final boolean flushFirst) throws EngineException {
+    public IndexCommit snapshotIndex(final boolean flushFirst) throws EngineException {
         // we have to flush outside of the readlock otherwise we might have a problem upgrading
         // the to a write lock when we fail the engine in this operation
         if (flushFirst) {

+ 4 - 2
core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.engine;
 
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.SearcherManager;
@@ -29,7 +30,6 @@ import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.translog.Translog;
 
 import java.io.IOException;
@@ -209,10 +209,12 @@ public class ShadowEngine extends Engine {
     }
 
     @Override
-    public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
+    public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
         throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
     }
 
+
+
     @Override
     protected SearcherManager getSearcherManager() {
         return searcherManager;

+ 21 - 7
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -22,6 +22,9 @@ package org.elasticsearch.index.shard;
 import java.nio.charset.StandardCharsets;
 import org.apache.lucene.codecs.PostingsFormat;
 import org.apache.lucene.index.CheckIndex;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
 import org.apache.lucene.search.QueryCachingPolicy;
 import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -65,11 +68,8 @@ import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
 import org.elasticsearch.index.cache.query.QueryCacheStats;
 import org.elasticsearch.index.cache.request.ShardRequestCache;
 import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.engine.*;
 import org.elasticsearch.index.fielddata.FieldDataStats;
-import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
 import org.elasticsearch.index.fielddata.ShardFieldData;
 import org.elasticsearch.index.flush.FlushStats;
@@ -206,16 +206,15 @@ public class IndexShard extends AbstractIndexShardComponent {
                       ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
                       IndicesQueryCache indicesQueryCache, CodecService codecService,
                       TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService,
-                      @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory,
+                      @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory,
                       ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
         super(shardId, indexSettingsService.getSettings());
         this.codecService = codecService;
         this.warmer = warmer;
-        this.deletionPolicy = deletionPolicy;
+        this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
         this.similarityService = similarityService;
         this.wrappingService = wrappingService;
         Objects.requireNonNull(store, "Store must be provided to the index shard");
-        Objects.requireNonNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
         this.engineFactory = factory;
         this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
         this.indexSettingsService = indexSettingsService;
@@ -745,7 +744,13 @@ public class IndexShard extends AbstractIndexShardComponent {
         return luceneVersion == null ? Version.indexCreated(indexSettings).luceneVersion : luceneVersion;
     }
 
-    public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
+    /**
+     * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
+     * commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}.
+     *
+     * @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
+     */
+    public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
         IndexShardState state = this.state; // one time volatile read
         // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
         if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
@@ -755,6 +760,15 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
+
+    /**
+     * Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources
+     * referenced by the given snapshot {@link IndexCommit}.
+     */
+    public void releaseSnapshot(IndexCommit snapshot) throws IOException {
+        deletionPolicy.release(snapshot);
+    }
+
     /**
      * Fails the shard and marks the shard store as corrupted if
      * <code>e</code> is caused by index corruption

+ 2 - 4
core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java

@@ -27,7 +27,6 @@ import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.aliases.IndexAliasesService;
 import org.elasticsearch.index.cache.IndexCache;
 import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
 import org.elasticsearch.index.engine.IndexSearcherWrappingService;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineConfig;
@@ -35,7 +34,6 @@ import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.merge.MergeStats;
-import org.elasticsearch.index.percolator.stats.ShardPercolateService;
 import org.elasticsearch.index.query.IndexQueryParserService;
 import org.elasticsearch.index.settings.IndexSettingsService;
 import org.elasticsearch.index.similarity.SimilarityService;
@@ -64,14 +62,14 @@ public final class ShadowIndexShard extends IndexShard {
                             IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache,
                             CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
                             IndexService indexService, @Nullable IndicesWarmer warmer,
-                            SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
+                            SimilarityService similarityService,
                             EngineFactory factory, ClusterService clusterService,
                             ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
         super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService,
                 threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
                 indicesQueryCache, codecService,
                 termVectorsService, indexFieldDataService, indexService,
-                warmer, deletionPolicy, similarityService,
+                warmer, similarityService,
                 factory, clusterService, path, bigArrays, wrappingService);
     }
 

+ 2 - 2
core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java

@@ -19,9 +19,9 @@
 
 package org.elasticsearch.index.snapshots;
 
+import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.metadata.SnapshotId;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.recovery.RecoveryState;
 
@@ -47,7 +47,7 @@ public interface IndexShardRepository {
      * @param snapshotIndexCommit commit point
      * @param snapshotStatus      snapshot status
      */
-    void snapshot(SnapshotId snapshotId, ShardId shardId, SnapshotIndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
+    void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
 
     /**
      * Restores snapshot of the shard.

+ 9 - 16
core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java

@@ -19,10 +19,7 @@
 
 package org.elasticsearch.index.snapshots.blobstore;
 
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexFormatTooNewException;
-import org.apache.lucene.index.IndexFormatTooOldException;
-import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.*;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
@@ -49,7 +46,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.*;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
@@ -150,12 +146,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
         this.snapshotRateLimiter = snapshotRateLimiter;
         this.restoreRateLimiter = restoreRateLimiter;
         this.rateLimiterListener = rateLimiterListener;
-        this.snapshotThrottleListener = new RateLimitingInputStream.Listener() {
-            @Override
-            public void onPause(long nanos) {
-                rateLimiterListener.onSnapshotPause(nanos);
-            }
-        };
+        this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
         this.compress = compress;
         indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
         indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
@@ -166,7 +157,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
      * {@inheritDoc}
      */
     @Override
-    public void snapshot(SnapshotId snapshotId, ShardId shardId, SnapshotIndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
+    public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
         SnapshotContext snapshotContext = new SnapshotContext(snapshotId, shardId, snapshotStatus);
         snapshotStatus.startTime(System.currentTimeMillis());
 
@@ -495,7 +486,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
         public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) {
             super(snapshotId, Version.CURRENT, shardId);
             IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
-            store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class);
+            store = indexService.shard(shardId.id()).store();
             this.snapshotStatus = snapshotStatus;
 
         }
@@ -505,7 +496,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
          *
          * @param snapshotIndexCommit snapshot commit point
          */
-        public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
+        public void snapshot(IndexCommit snapshotIndexCommit) {
             logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName);
             store.incRef();
             try {
@@ -528,12 +519,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
                 ArrayList<FileInfo> filesToSnapshot = new ArrayList<>();
                 final Store.MetadataSnapshot metadata;
                 // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
+                final Collection<String> fileNames;
                 try {
                     metadata = store.getMetadata(snapshotIndexCommit);
+                    fileNames = snapshotIndexCommit.getFileNames();
                 } catch (IOException e) {
                     throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
                 }
-                for (String fileName : snapshotIndexCommit.getFiles()) {
+                for (String fileName : fileNames) {
                     if (snapshotStatus.aborted()) {
                         logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
                         throw new IndexShardSnapshotFailedException(shardId, "Aborted");
@@ -776,7 +769,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
          */
         public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
             super(snapshotId, version, shardId, snapshotShardId);
-            store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class);
+            store = indicesService.indexServiceSafe(shardId.getIndex()).shard(shardId.id()).store();
             this.recoveryState = recoveryState;
         }
 

+ 10 - 7
core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.indices.recovery;
 
 import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexFormatTooNewException;
 import org.apache.lucene.index.IndexFormatTooOldException;
 import org.apache.lucene.store.IOContext;
@@ -33,14 +34,12 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.compress.CompressorFactory;
-import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.CancellableThreads;
 import org.elasticsearch.common.util.CancellableThreads.Interruptable;
 import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.RecoveryEngineException;
 import org.elasticsearch.index.shard.IllegalIndexShardStateException;
@@ -122,7 +121,7 @@ public class RecoverySourceHandler {
         assert engine.getTranslog() != null : "translog must not be null";
         try (Translog.View translogView = engine.getTranslog().newView()) {
             logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
-            final SnapshotIndexCommit phase1Snapshot;
+            final IndexCommit phase1Snapshot;
             try {
                 phase1Snapshot = shard.snapshotIndex(false);
             } catch (Throwable e) {
@@ -135,7 +134,11 @@ public class RecoverySourceHandler {
             } catch (Throwable e) {
                 throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
             } finally {
-                Releasables.closeWhileHandlingException(phase1Snapshot);
+                try {
+                    shard.releaseSnapshot(phase1Snapshot);
+                } catch (IOException ex) {
+                    logger.warn("releasing snapshot caused exception", ex);
+                }
             }
 
             logger.trace("snapshot translog for recovery. current size is [{}]", translogView.totalOperations());
@@ -151,7 +154,7 @@ public class RecoverySourceHandler {
     }
 
     /**
-     * Perform phase1 of the recovery operations. Once this {@link SnapshotIndexCommit}
+     * Perform phase1 of the recovery operations. Once this {@link IndexCommit}
      * snapshot has been performed no commit operations (files being fsync'd)
      * are effectively allowed on this index until all recovery phases are done
      * <p>
@@ -159,7 +162,7 @@ public class RecoverySourceHandler {
      * segments that are missing. Only segments that have the same size and
      * checksum can be reused
      */
-    public void phase1(final SnapshotIndexCommit snapshot, final Translog.View translogView) {
+    public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
         cancellableThreads.checkForCancel();
         // Total size of segment files that are recovered
         long totalSize = 0;
@@ -176,7 +179,7 @@ public class RecoverySourceHandler {
                 shard.engine().failEngine("recovery", ex);
                 throw ex;
             }
-            for (String name : snapshot.getFiles()) {
+            for (String name : snapshot.getFileNames()) {
                 final StoreFileMetaData md = recoverySourceMetadata.get(name);
                 if (md == null) {
                     logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());

+ 4 - 1
core/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -18,9 +18,12 @@
  */
 package org.elasticsearch.repositories;
 
+import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.SnapshotId;
 import org.elasticsearch.common.component.LifecycleComponent;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.snapshots.SnapshotShardFailure;
 
@@ -38,7 +41,7 @@ import java.util.List;
  * <ul>
  * <li>Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
  * with list of indices that will be included into the snapshot</li>
- * <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard</li>
+ * <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(SnapshotId, ShardId, IndexCommit, IndexShardSnapshotStatus)} for each shard</li>
  * <li>When all shard calls return master calls {@link #finalizeSnapshot}
  * with possible list of failures</li>
  * </ul>

+ 3 - 3
core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.snapshots;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterService;
@@ -38,7 +39,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.engine.SnapshotFailedEngineException;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardState;
@@ -335,7 +335,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
 
         try {
             // we flush first to make sure we get the latest writes snapshotted
-            SnapshotIndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
+            IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
             try {
                 indexShardRepository.snapshot(snapshotId, shardId, snapshotIndexCommit, snapshotStatus);
                 if (logger.isDebugEnabled()) {
@@ -345,7 +345,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
                     logger.debug(sb.toString());
                 }
             } finally {
-                snapshotIndexCommit.close();
+                indexShard.releaseSnapshot(snapshotIndexCommit);
             }
         } catch (SnapshotFailedEngineException e) {
             throw e;

+ 0 - 180
core/src/test/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicyTests.java

@@ -1,180 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.RAMDirectory;
-import org.elasticsearch.common.lucene.Lucene;
-import org.elasticsearch.index.Index;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.test.ESTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.lucene.index.DirectoryReader.listCommits;
-import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
-import static org.hamcrest.Matchers.equalTo;
-
-/**
- * A set of tests for {@link org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy}.
- */
-public class SnapshotDeletionPolicyTests extends ESTestCase {
-
-    protected final ShardId shardId = new ShardId(new Index("index"), 1);
-
-    private RAMDirectory dir;
-    private SnapshotDeletionPolicy deletionPolicy;
-    private IndexWriter indexWriter;
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        dir = new RAMDirectory();
-        deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS));
-        indexWriter = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
-                .setIndexDeletionPolicy(deletionPolicy)
-                .setOpenMode(IndexWriterConfig.OpenMode.CREATE));
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-        indexWriter.close();
-        dir.close();
-    }
-
-    private Document testDocument() {
-        Document document = new Document();
-        document.add(new TextField("test", "1", Field.Store.YES));
-        return document;
-    }
-
-    @Test
-    public void testSimpleSnapshot() throws Exception {
-        // add a document and commit, resulting in one commit point
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-
-        assertThat(listCommits(dir).size(), equalTo(1));
-
-        // add another document and commit, resulting again in one commit point
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-
-        // snapshot the last commit, and then add a document and commit, now we should have two commit points
-        SnapshotIndexCommit snapshot = deletionPolicy.snapshot();
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(2));
-
-        // release the commit, add a document and commit, now we should be back to one commit point
-        snapshot.close();
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-    }
-
-    @Test
-    public void testMultiSnapshot() throws Exception {
-        // add a document and commit, resulting in one commit point
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-
-        // take two snapshots
-        SnapshotIndexCommit snapshot1 = deletionPolicy.snapshot();
-        SnapshotIndexCommit snapshot2 = deletionPolicy.snapshot();
-
-        // we should have two commits points
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(2));
-
-        // release one snapshot, we should still have two commit points
-        snapshot1.close();
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(2));
-
-        // release the second snapshot, we should be back to one commit
-        snapshot2.close();
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-    }
-
-    @Test
-    public void testMultiReleaseException() throws Exception {
-        // add a document and commit, resulting in one commit point
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-
-        // snapshot the last commit, and release it twice, the seconds should throw an exception
-        SnapshotIndexCommit snapshot = deletionPolicy.snapshot();
-        snapshot.close();
-        snapshot.close();
-    }
-
-    @Test
-    public void testSimpleSnapshots() throws Exception {
-        // add a document and commit, resulting in one commit point
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-
-        // add another document and commit, resulting again in one commint point
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-
-        // snapshot the last commit, and then add a document and commit, now we should have two commit points
-        SnapshotIndexCommit snapshot = deletionPolicy.snapshot();
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(2));
-
-        // now, take a snapshot of all the commits
-        SnapshotIndexCommits snapshots = deletionPolicy.snapshots();
-        assertThat(snapshots.size(), equalTo(2));
-
-        // release the snapshot, add a document and commit
-        // we should have 3 commits points since we are holding onto the first two with snapshots
-        // and we are using the keep only last
-        snapshot.close();
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(3));
-
-        // now release the snapshots, we should be back to a single commit point
-        snapshots.close();
-        indexWriter.addDocument(testDocument());
-        indexWriter.commit();
-        assertThat(listCommits(dir).size(), equalTo(1));
-    }
-}

+ 0 - 58
core/src/test/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommitExistsMatcher.java

@@ -1,58 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.deletionpolicy;
-
-import org.elasticsearch.common.util.set.Sets;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-/**
- *
- */
-public class SnapshotIndexCommitExistsMatcher extends TypeSafeMatcher<SnapshotIndexCommit> {
-
-    @Override
-    public boolean matchesSafely(SnapshotIndexCommit snapshotIndexCommit) {
-        try {
-            HashSet<String> files = Sets.newHashSet(snapshotIndexCommit.getDirectory().listAll());
-            for (String fileName : snapshotIndexCommit.getFiles()) {
-                if (files.contains(fileName) == false) {
-                    return false;
-                }
-            }
-        } catch (IOException ex) {
-            throw new RuntimeException(ex);
-        }
-        return true;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-        description.appendText("an index commit existence");
-    }
-
-    public static Matcher<SnapshotIndexCommit> snapshotIndexCommitExists() {
-        return new SnapshotIndexCommitExistsMatcher();
-    }
-}

+ 1 - 8
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -60,8 +60,6 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.analysis.AnalysisService;
 import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
-import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
 import org.elasticsearch.index.engine.Engine.Searcher;
 import org.elasticsearch.index.indexing.ShardIndexingService;
 import org.elasticsearch.index.mapper.*;
@@ -231,15 +229,10 @@ public class InternalEngineTests extends ESTestCase {
         return new Translog(translogConfig);
     }
 
-    protected IndexDeletionPolicy createIndexDeletionPolicy() {
-        return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS);
-    }
-
     protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
-        return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
+        return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
     }
 
-
     protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) {
         return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers);
     }

+ 1 - 8
core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java

@@ -40,8 +40,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
-import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
 import org.elasticsearch.index.indexing.ShardIndexingService;
 import org.elasticsearch.index.mapper.Mapping;
 import org.elasticsearch.index.mapper.ParseContext;
@@ -185,15 +183,10 @@ public class ShadowEngineTests extends ESTestCase {
         return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId));
     }
 
-    protected IndexDeletionPolicy createIndexDeletionPolicy() {
-        return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS);
-    }
-
     protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
-        return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
+        return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
     }
 
-
     protected ShadowEngine createShadowEngine(Store store) {
         return createShadowEngine(defaultSettings, store);
     }

+ 1 - 3
core/src/test/java/org/elasticsearch/index/store/StoreTests.java

@@ -40,8 +40,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.env.ShardLock;
 import org.elasticsearch.index.Index;
-import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
-import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
@@ -1144,7 +1142,7 @@ public class StoreTests extends ESTestCase {
         DirectoryService directoryService = new LuceneManagedDirectoryService(random());
         Store store = new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId));
         IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec());
-        SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS));
+        SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
         config.setIndexDeletionPolicy(deletionPolicy);
         IndexWriter writer = new IndexWriter(store.directory(), config);
         Document doc = new Document();