Ver Fonte

Merge branch 'master' into go_away_transport_paths

Ryan Ernst há 10 anos atrás
pai
commit
92b62c0c6b
35 ficheiros alterados com 742 adições e 254 exclusões
  1. 0 1
      core/src/main/java/org/elasticsearch/ElasticsearchException.java
  2. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java
  3. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java
  4. 3 1
      core/src/main/java/org/elasticsearch/action/support/TransportActions.java
  5. 1 1
      core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
  6. 8 0
      core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java
  7. 9 1
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
  8. 1 1
      core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
  9. 1 8
      core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
  10. 15 1
      core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java
  11. 64 2
      core/src/main/java/org/elasticsearch/index/IndexService.java
  12. 80 69
      core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java
  13. 1 1
      core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java
  14. 2 2
      core/src/main/java/org/elasticsearch/index/engine/Engine.java
  15. 0 39
      core/src/main/java/org/elasticsearch/index/engine/ForceMergeFailedEngineException.java
  16. 4 4
      core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  17. 11 5
      core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java
  18. 35 11
      core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java
  19. 2 2
      core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java
  20. 1 1
      core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java
  21. 0 1
      core/src/main/java/org/elasticsearch/index/query/QueryParseContext.java
  22. 2 2
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  23. 44 64
      core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java
  24. 4 2
      core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java
  25. 1 7
      core/src/test/java/org/elasticsearch/bootstrap/BootstrapForTesting.java
  26. 98 0
      core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java
  27. 94 0
      core/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java
  28. 41 2
      core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  29. 1 2
      core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java
  30. 3 6
      core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java
  31. 1 5
      core/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java
  32. 80 0
      core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java
  33. 1 10
      core/src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java
  34. 2 1
      core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java
  35. 130 0
      dev-tools/validate-maven-repository.py

+ 0 - 1
core/src/main/java/org/elasticsearch/ElasticsearchException.java

@@ -583,7 +583,6 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
                 org.elasticsearch.index.query.QueryParsingException.class,
                 org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class,
                 org.elasticsearch.index.engine.DeleteByQueryFailedEngineException.class,
-                org.elasticsearch.index.engine.ForceMergeFailedEngineException.class,
                 org.elasticsearch.discovery.MasterNotDiscoveredException.class,
                 org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException.class,
                 org.elasticsearch.node.NodeClosedException.class,

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java

@@ -74,7 +74,7 @@ public class TransportOptimizeAction extends TransportBroadcastByNodeAction<Opti
     }
 
     @Override
-    protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) {
+    protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) throws IOException {
         IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
         indexShard.optimize(request);
         return EmptyResult.INSTANCE;

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java

@@ -119,7 +119,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
     }
 
     @Override
-    protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) {
+    protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) throws IOException {
         IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
         org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request);
         // We are using the current version of Elasticsearch as upgrade version since we update mapping to match the current version

+ 3 - 1
core/src/main/java/org/elasticsearch/action/support/TransportActions.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.support;
 
+import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.NoShardAvailableActionException;
 import org.elasticsearch.action.UnavailableShardsException;
@@ -36,7 +37,8 @@ public class TransportActions {
                 actual instanceof IndexNotFoundException ||
                 actual instanceof IllegalIndexShardStateException ||
                 actual instanceof NoShardAvailableActionException ||
-                actual instanceof UnavailableShardsException) {
+                actual instanceof UnavailableShardsException ||
+                actual instanceof AlreadyClosedException) {
             return true;
         }
         return false;

+ 1 - 1
core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

@@ -181,7 +181,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
      * @param shardRouting the shard on which to execute the operation
      * @return the result of the shard-level operation for the shard
      */
-    protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting);
+    protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting) throws IOException;
 
     /**
      * Determines the shards on which this operation will be executed on. The operation is executed once per shard.

+ 8 - 0
core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

@@ -210,6 +210,14 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
                 onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
             } else {
                 internalRequest.request().internalShardId = shardRouting.shardId();
+                if (logger.isTraceEnabled()) {
+                    logger.trace(
+                            "sending request [{}] to shard [{}] on node [{}]",
+                            internalRequest.request(),
+                            internalRequest.request().internalShardId,
+                            node
+                    );
+                }
                 transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
 
                     @Override

+ 9 - 1
core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

@@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
 
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.IntroSorter;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.RoutingNode;
@@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.gateway.PriorityComparator;
 import org.elasticsearch.node.settings.NodeSettingsService;
 
 import java.util.*;
@@ -559,6 +561,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
              * use the sorter to save some iterations. 
              */
             final AllocationDeciders deciders = allocation.deciders();
+            final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation);
             final Comparator<ShardRouting> comparator = new Comparator<ShardRouting>() {
                 @Override
                 public int compare(ShardRouting o1,
@@ -570,7 +573,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
                     if ((indexCmp = o1.index().compareTo(o2.index())) == 0) {
                         return o1.getId() - o2.getId();
                     }
-                    return indexCmp;
+                    // this comparator is more expensive than all the others up there
+                    // that's why it's added last even though it could be easier to read
+                    // if we'd apply it earlier. this comparator will only differentiate across
+                    // indices all shards of the same index is treated equally.
+                    final int secondary = secondaryComparator.compare(o1, o2);
+                    return secondary == 0 ? indexCmp : secondary;
                 }
             };
             /*

+ 1 - 1
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -989,7 +989,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         }
     }
 
-    private ClusterState rejoin(ClusterState clusterState, String reason) {
+    protected ClusterState rejoin(ClusterState clusterState, String reason) {
 
         // *** called from within an cluster state update task *** //
         assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME);

+ 1 - 8
core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java

@@ -120,14 +120,7 @@ public class GatewayAllocator extends AbstractComponent {
         boolean changed = false;
 
         RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
-        unassigned.sort(new PriorityComparator() {
-
-            @Override
-            protected Settings getIndexSettings(String index) {
-                IndexMetaData indexMetaData = allocation.metaData().index(index);
-                return indexMetaData.getSettings();
-            }
-        }); // sort for priority ordering
+        unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
 
         changed |= primaryShardAllocator.allocateUnassigned(allocation);
         changed |= replicaShardAllocator.processExistingRecoveries(allocation);

+ 15 - 1
core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java

@@ -21,6 +21,7 @@ package org.elasticsearch.gateway;
 
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.common.settings.Settings;
 
 import java.util.Comparator;
@@ -33,7 +34,7 @@ import java.util.Comparator;
  * here the newer indices matter more). If even that is the same, we compare the index name which is useful
  * if the date is baked into the index name. ie logstash-2015.05.03.
  */
-abstract class PriorityComparator implements Comparator<ShardRouting> {
+public abstract class PriorityComparator implements Comparator<ShardRouting> {
 
     @Override
     public final int compare(ShardRouting o1, ShardRouting o2) {
@@ -63,4 +64,17 @@ abstract class PriorityComparator implements Comparator<ShardRouting> {
     }
 
     protected abstract Settings getIndexSettings(String index);
+
+    /**
+     * Returns a PriorityComparator that uses the RoutingAllocation index metadata to access the index setting per index.
+     */
+    public static PriorityComparator getAllocationComparator(final RoutingAllocation allocation) {
+        return new PriorityComparator() {
+            @Override
+            protected Settings getIndexSettings(String index) {
+                IndexMetaData indexMetaData = allocation.metaData().index(index);
+                return indexMetaData.getSettings();
+            }
+        };
+    }
 }

+ 64 - 2
core/src/main/java/org/elasticsearch/index/IndexService.java

@@ -23,6 +23,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 
+import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -38,7 +39,10 @@ import org.elasticsearch.index.analysis.AnalysisService;
 import org.elasticsearch.index.cache.IndexCache;
 import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
 import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
+import org.elasticsearch.index.fielddata.FieldDataType;
+import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
+import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.query.IndexQueryParserService;
 import org.elasticsearch.index.settings.IndexSettings;
@@ -150,8 +154,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
         this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
 
         // inject workarounds for cyclic dep
-        indexFieldData.setIndexService(this);
-        bitSetFilterCache.setIndexService(this);
+        indexFieldData.setListener(new FieldDataCacheListener(this));
+        bitSetFilterCache.setListener(new BitsetCacheListener(this));
         this.nodeEnv = nodeEnv;
     }
 
@@ -537,4 +541,62 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
     public Settings getIndexSettings() {
         return indexSettings;
     }
+
+    private static final class BitsetCacheListener implements BitsetFilterCache.Listener {
+        final IndexService indexService;
+
+        private BitsetCacheListener(IndexService indexService) {
+            this.indexService = indexService;
+        }
+
+        @Override
+        public void onCache(ShardId shardId, Accountable accountable) {
+            if (shardId != null) {
+                final IndexShard shard = indexService.shard(shardId.id());
+                if (shard != null) {
+                    long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l;
+                    shard.shardBitsetFilterCache().onCached(ramBytesUsed);
+                }
+            }
+        }
+
+        @Override
+        public void onRemoval(ShardId shardId, Accountable accountable) {
+            if (shardId != null) {
+                final IndexShard shard = indexService.shard(shardId.id());
+                if (shard != null) {
+                    long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l;
+                    shard.shardBitsetFilterCache().onRemoval(ramBytesUsed);
+                }
+            }
+        }
+    }
+
+    private final class FieldDataCacheListener implements IndexFieldDataCache.Listener {
+        final IndexService indexService;
+
+        public FieldDataCacheListener(IndexService indexService) {
+            this.indexService = indexService;
+        }
+
+        @Override
+        public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
+            if (shardId != null) {
+                final IndexShard shard = indexService.shard(shardId.id());
+                if (shard != null) {
+                    shard.fieldData().onCache(shardId, fieldNames, fieldDataType, ramUsage);
+                }
+            }
+        }
+
+        @Override
+        public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
+            if (shardId != null) {
+                final IndexShard shard = indexService.shard(shardId.id());
+                if (shard != null) {
+                    shard.fieldData().onRemoval(shardId, fieldNames, fieldDataType, wasEvicted, sizeInBytes);
+                }
+            }
+        }
+    }
 }

+ 80 - 69
core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java

@@ -33,6 +33,7 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.search.join.BitSetProducer;
+import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.BitDocIdSet;
 import org.apache.lucene.util.BitSet;
 import org.elasticsearch.ExceptionsHelper;
@@ -43,7 +44,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.AbstractIndexComponent;
 import org.elasticsearch.index.Index;
-import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.object.ObjectMapper;
@@ -61,10 +61,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.*;
 
 /**
  * This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
@@ -76,12 +73,21 @@ import java.util.concurrent.Executor;
 public class BitsetFilterCache extends AbstractIndexComponent implements LeafReader.CoreClosedListener, RemovalListener<Object, Cache<Query, BitsetFilterCache.Value>>, Closeable {
 
     public static final String LOAD_RANDOM_ACCESS_FILTERS_EAGERLY = "index.load_fixed_bitset_filters_eagerly";
+    private static final Listener DEFAULT_NOOP_LISTENER =  new Listener() {
+        @Override
+        public void onCache(ShardId shardId, Accountable accountable) {
+        }
+
+        @Override
+        public void onRemoval(ShardId shardId, Accountable accountable) {
+        }
+    };
 
     private final boolean loadRandomAccessFiltersEagerly;
     private final Cache<Object, Cache<Query, Value>> loadedFilters;
+    private volatile Listener listener = DEFAULT_NOOP_LISTENER;
     private final BitSetProducerWarmer warmer;
 
-    private IndexService indexService;
     private IndicesWarmer indicesWarmer;
 
     @Inject
@@ -95,16 +101,24 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
     @Inject(optional = true)
     public void setIndicesWarmer(IndicesWarmer indicesWarmer) {
         this.indicesWarmer = indicesWarmer;
+        indicesWarmer.addListener(warmer);
     }
 
-    public void setIndexService(IndexService indexService) {
-        this.indexService = indexService;
-        // First the indicesWarmer is set and then the indexService is set, because of this there is a small window of
-        // time where indexService is null. This is why the warmer should only registered after indexService has been set.
-        // Otherwise there is a small chance of the warmer running into a NPE, since it uses the indexService
-        indicesWarmer.addListener(warmer);
+    /**
+     * Sets a listener that is invoked for all subsequent cache and removal events.
+     * @throws IllegalStateException if the listener is set more than once
+     */
+    public void setListener(Listener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("listener must not be null");
+        }
+        if (this.listener != DEFAULT_NOOP_LISTENER) {
+            throw new IllegalStateException("can't set listener more than once");
+        }
+        this.listener = listener;
     }
 
+
     public BitSetProducer getBitSetProducer(Query query) {
         return new QueryWrapperBitSetProducer(query);
     }
@@ -116,7 +130,9 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
 
     @Override
     public void close() {
-        indicesWarmer.removeListener(warmer);
+        if (indicesWarmer != null) {
+            indicesWarmer.removeListener(warmer);
+        }
         clear("close");
     }
 
@@ -135,31 +151,22 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
                 return CacheBuilder.newBuilder().build();
             }
         });
-        return filterToFbs.get(query, new Callable<Value>() {
-            @Override
-            public Value call() throws Exception {
-                final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
-                final IndexSearcher searcher = new IndexSearcher(topLevelContext);
-                searcher.setQueryCache(null);
-                final Weight weight = searcher.createNormalizedWeight(query, false);
-                final DocIdSetIterator it = weight.scorer(context);
-                final BitSet bitSet;
-                if (it == null) {
-                    bitSet = null;
-                } else {
-                    bitSet = BitSet.of(it, context.reader().maxDoc());
-                }
-
-                Value value = new Value(bitSet, shardId);
-                if (shardId != null) {
-                    IndexShard shard = indexService.shard(shardId.id());
-                    if (shard != null) {
-                        long ramBytesUsed = value.bitset != null ? value.bitset.ramBytesUsed() : 0l;
-                        shard.shardBitsetFilterCache().onCached(ramBytesUsed);
-                    }
-                }
-                return value;
+        return filterToFbs.get(query, () -> {
+            final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
+            final IndexSearcher searcher = new IndexSearcher(topLevelContext);
+            searcher.setQueryCache(null);
+            final Weight weight = searcher.createNormalizedWeight(query, false);
+            final DocIdSetIterator it = weight.scorer(context);
+            final BitSet bitSet;
+            if (it == null) {
+                bitSet = null;
+            } else {
+                bitSet = BitSet.of(it, context.reader().maxDoc());
             }
+
+            Value value = new Value(bitSet, shardId);
+            listener.onCache(shardId, value.bitset);
+            return value;
         }).bitset;
     }
 
@@ -170,20 +177,13 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
             return;
         }
 
-        Cache<Query, Value> value = notification.getValue();
-        if (value == null) {
+        Cache<Query, Value> valueCache = notification.getValue();
+        if (valueCache == null) {
             return;
         }
 
-        for (Map.Entry<Query, Value> entry : value.asMap().entrySet()) {
-            if (entry.getValue().shardId == null) {
-                continue;
-            }
-            IndexShard shard = indexService.shard(entry.getValue().shardId.id());
-            if (shard != null) {
-                ShardBitsetFilterCache shardBitsetFilterCache = shard.shardBitsetFilterCache();
-                shardBitsetFilterCache.onRemoval(entry.getValue().bitset.ramBytesUsed());
-            }
+        for (Value value : valueCache.asMap().values()) {
+            listener.onRemoval(value.shardId, value.bitset);
             // if null then this means the shard has already been removed and the stats are 0 anyway for the shard this key belongs to
         }
     }
@@ -266,32 +266,22 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
             final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size());
             for (final LeafReaderContext ctx : context.searcher().reader().leaves()) {
                 for (final Query filterToWarm : warmUp) {
-                    executor.execute(new Runnable() {
-
-                        @Override
-                        public void run() {
-                            try {
-                                final long start = System.nanoTime();
-                                getAndLoadIfNotPresent(filterToWarm, ctx);
-                                if (indexShard.warmerService().logger().isTraceEnabled()) {
-                                    indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start));
-                                }
-                            } catch (Throwable t) {
-                                indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm);
-                            } finally {
-                                latch.countDown();
+                    executor.execute(() -> {
+                        try {
+                            final long start = System.nanoTime();
+                            getAndLoadIfNotPresent(filterToWarm, ctx);
+                            if (indexShard.warmerService().logger().isTraceEnabled()) {
+                                indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start));
                             }
+                        } catch (Throwable t) {
+                            indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm);
+                        } finally {
+                            latch.countDown();
                         }
-
                     });
                 }
             }
-            return new TerminationHandle() {
-                @Override
-                public void awaitTermination() throws InterruptedException {
-                    latch.await();
-                }
-            };
+            return () -> latch.await();
         }
 
         @Override
@@ -304,4 +294,25 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
     Cache<Object, Cache<Query, Value>> getLoadedFilters() {
         return loadedFilters;
     }
+
+
+    /**
+     *  A listener interface that is executed for each onCache / onRemoval event
+     */
+    public interface Listener {
+        /**
+         * Called for each cached bitset on the cache event.
+         * @param shardId the shard id the bitset was cached for. This can be <code>null</code>
+         * @param accountable the bitsets ram representation
+         */
+        void onCache(ShardId shardId, Accountable accountable);
+        /**
+         * Called for each cached bitset on the removal event.
+         * @param shardId the shard id the bitset was cached for. This can be <code>null</code>
+         * @param accountable the bitsets ram representation
+         */
+        void onRemoval(ShardId shardId, Accountable accountable);
+    }
+
+
 }

+ 1 - 1
core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.index.cache.bitset;
 
-import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.settings.IndexSettings;
@@ -47,4 +46,5 @@ public class ShardBitsetFilterCache extends AbstractIndexShardComponent {
     public long getMemorySizeInBytes() {
         return totalMetric.count();
     }
+
 }

+ 2 - 2
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -502,14 +502,14 @@ public abstract class Engine implements Closeable {
     /**
      * Optimizes to 1 segment
      */
-    public void forceMerge(boolean flush) {
+    public void forceMerge(boolean flush) throws IOException {
         forceMerge(flush, 1, false, false, false);
     }
 
     /**
      * Triggers a forced merge on this engine
      */
-    public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;
+    public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
 
     /**
      * Snapshots the index and returns a handle to it. If needed will try and "commit" the

+ 0 - 39
core/src/main/java/org/elasticsearch/index/engine/ForceMergeFailedEngineException.java

@@ -1,39 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.engine;
-
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class ForceMergeFailedEngineException extends EngineException {
-
-    public ForceMergeFailedEngineException(ShardId shardId, Throwable t) {
-        super(shardId, "force merge failed", t);
-    }
-
-    public ForceMergeFailedEngineException(StreamInput in) throws IOException{
-        super(in);
-    }
-}

+ 4 - 4
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -45,6 +45,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.cluster.routing.DjbHashFunction;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.lease.Releasable;
@@ -823,7 +824,7 @@ public class InternalEngine extends Engine {
 
     @Override
     public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
-                           final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException {
+                           final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, EngineClosedException, IOException {
         /*
          * We do NOT acquire the readlock here since we are waiting on the merges to finish
          * that's fine since the IW.rollback should stop all the threads and trigger an IOException
@@ -865,9 +866,8 @@ public class InternalEngine extends Engine {
                 store.decRef();
             }
         } catch (Throwable t) {
-            ForceMergeFailedEngineException ex = new ForceMergeFailedEngineException(shardId, t);
-            maybeFailEngine("force merge", ex);
-            throw ex;
+            maybeFailEngine("force merge", t);
+            throw t;
         } finally {
             try {
                 mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error

+ 11 - 5
core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java

@@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.util.Accountable;
 import org.elasticsearch.index.mapper.FieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.shard.ShardId;
 
 /**
  * A simple field data cache abstraction on the *index* level.
@@ -44,13 +45,19 @@ public interface IndexFieldDataCache {
      */
     void clear(String fieldName);
 
-    void clear(Object coreCacheKey);
+    void clear(IndexReader reader);
 
     interface Listener {
 
-        void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage);
+        /**
+         * Called after the fielddata is loaded during the cache phase
+         */
+        void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage);
 
-        void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes);
+        /**
+         * Called after the fielddata is unloaded
+         */
+        void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes);
     }
 
     class None implements IndexFieldDataCache {
@@ -75,8 +82,7 @@ public interface IndexFieldDataCache {
         }
 
         @Override
-        public void clear(Object coreCacheKey) {
-
+        public void clear(IndexReader reader) {
         }
     }
 }

+ 35 - 11
core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java

@@ -22,6 +22,7 @@ package org.elasticsearch.index.fielddata;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+import org.apache.lucene.util.Accountable;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.collect.MapBuilder;
@@ -32,11 +33,12 @@ import org.elasticsearch.index.AbstractIndexComponent;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.fielddata.plain.*;
 import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.core.BooleanFieldMapper;
 import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
 import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
-import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.settings.IndexSettings;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 
@@ -139,8 +141,18 @@ public class IndexFieldDataService extends AbstractIndexComponent {
     private final IndicesFieldDataCache indicesFieldDataCache;
     // the below map needs to be modified under a lock
     private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap();
+    private final MapperService mapperService;
+    private static final IndexFieldDataCache.Listener DEFAULT_NOOP_LISTENER = new IndexFieldDataCache.Listener() {
+        @Override
+        public void onCache(ShardId shardId, Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
+        }
+
+        @Override
+        public void onRemoval(ShardId shardId, Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
+        }
+    };
+    private volatile IndexFieldDataCache.Listener listener = DEFAULT_NOOP_LISTENER;
 
-    IndexService indexService;
 
     // We need to cache fielddata on the _parent field because of 1.x indices.
     // When we don't support 1.x anymore (3.0) then remove this caching
@@ -149,15 +161,11 @@ public class IndexFieldDataService extends AbstractIndexComponent {
 
     @Inject
     public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
-                                 CircuitBreakerService circuitBreakerService) {
+                                 CircuitBreakerService circuitBreakerService, MapperService mapperService) {
         super(index, indexSettings);
         this.indicesFieldDataCache = indicesFieldDataCache;
         this.circuitBreakerService = circuitBreakerService;
-    }
-
-    // we need to "inject" the index service to not create cyclic dep
-    public void setIndexService(IndexService indexService) {
-        this.indexService = indexService;
+        this.mapperService = mapperService;
     }
 
     public synchronized void clear() {
@@ -229,7 +237,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
                 // this means changing the node level settings is simple, just set the bounds there
                 String cacheType = type.getSettings().get("cache", indexSettings.get(FIELDDATA_CACHE_KEY, FIELDDATA_CACHE_VALUE_NODE));
                 if (FIELDDATA_CACHE_VALUE_NODE.equals(cacheType)) {
-                    cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
+                    cache = indicesFieldDataCache.buildIndexFieldDataCache(listener, index, fieldNames, type);
                 } else if ("none".equals(cacheType)){
                     cache = new IndexFieldDataCache.None();
                 } else {
@@ -243,13 +251,29 @@ public class IndexFieldDataService extends AbstractIndexComponent {
                     && Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1);
             if (isOldParentField) {
                 if (parentIndexFieldData == null) {
-                    parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
+                    parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, mapperService);
                 }
                 return (IFD) parentIndexFieldData;
             }
         }
 
-        return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
+        return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, mapperService);
+    }
+
+    /**
+     * Sets a {@link org.elasticsearch.index.fielddata.IndexFieldDataCache.Listener} passed to each {@link IndexFieldData}
+     * creation to capture onCache and onRemoval events. Setting a listener on this method will override any previously
+     * set listeners.
+     * @throws IllegalStateException if the listener is set more than once
+     */
+    public void setListener(IndexFieldDataCache.Listener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("listener must not be null");
+        }
+        if (this.listener != DEFAULT_NOOP_LISTENER) {
+            throw new IllegalStateException("can't set listener more than once");
+        }
+        this.listener = listener;
     }
 
 }

+ 2 - 2
core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java

@@ -56,7 +56,7 @@ public class ShardFieldData implements IndexFieldDataCache.Listener {
     }
 
     @Override
-    public void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
+    public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
         totalMetric.inc(ramUsage.ramBytesUsed());
         String keyFieldName = fieldNames.indexName();
         CounterMetric total = perFieldTotals.get(keyFieldName);
@@ -73,7 +73,7 @@ public class ShardFieldData implements IndexFieldDataCache.Listener {
     }
 
     @Override
-    public void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
+    public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
         if (wasEvicted) {
             evictionsMetric.inc();
         }

+ 1 - 1
core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java

@@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
 import org.elasticsearch.index.indexing.IndexingOperationListener;
@@ -82,7 +83,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
     private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener();
     private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener();
     private final AtomicBoolean realTimePercolatorEnabled = new AtomicBoolean(false);
-
     private boolean mapUnmappedFieldsAsString;
 
     public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService,

+ 0 - 1
core/src/main/java/org/elasticsearch/index/query/QueryParseContext.java

@@ -395,5 +395,4 @@ public class QueryParseContext {
     public Version indexVersionCreated() {
         return indexVersionCreated;
     }
-
 }

+ 2 - 2
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -697,7 +697,7 @@ public class IndexShard extends AbstractIndexShardComponent {
 
     }
 
-    public void optimize(OptimizeRequest optimize) {
+    public void optimize(OptimizeRequest optimize) throws IOException {
         verifyStarted();
         if (logger.isTraceEnabled()) {
             logger.trace("optimize with {}", optimize);
@@ -708,7 +708,7 @@ public class IndexShard extends AbstractIndexShardComponent {
     /**
      * Upgrades the shard to the current version of Lucene and returns the minimum segment version
      */
-    public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) {
+    public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOException {
         verifyStarted();
         if (logger.isTraceEnabled()) {
             logger.trace("upgrade with {}", upgrade);

+ 44 - 64
core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java

@@ -24,6 +24,7 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.SegmentReader;
 import org.apache.lucene.util.Accountable;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.logging.ESLogger;
@@ -35,17 +36,13 @@ import org.elasticsearch.index.fielddata.AtomicFieldData;
 import org.elasticsearch.index.fielddata.FieldDataType;
 import org.elasticsearch.index.fielddata.IndexFieldData;
 import org.elasticsearch.index.fielddata.IndexFieldDataCache;
-import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardUtils;
-import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -95,8 +92,8 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
         this.closed = true;
     }
 
-    public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) {
-        return new IndexFieldCache(logger, cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType);
+    public IndexFieldDataCache buildIndexFieldDataCache(IndexFieldDataCache.Listener listener, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) {
+        return new IndexFieldCache(logger, cache, index, fieldNames, fieldDataType, indicesFieldDataCacheListener, listener);
     }
 
     public Cache<Key, Accountable> getCache() {
@@ -111,7 +108,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
         final Accountable value = notification.getValue();
         for (IndexFieldDataCache.Listener listener : key.listeners) {
             try {
-                listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed());
+                listener.onRemoval(key.shardId, indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed());
             } catch (Throwable e) {
                 // load anyway since listeners should not throw exceptions
                 logger.error("Failed to call listener on field data cache unloading", e);
@@ -133,96 +130,78 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
      */
     static class IndexFieldCache implements IndexFieldDataCache, SegmentReader.CoreClosedListener, IndexReader.ReaderClosedListener {
         private final ESLogger logger;
-        private final IndexService indexService;
         final Index index;
         final MappedFieldType.Names fieldNames;
         final FieldDataType fieldDataType;
         private final Cache<Key, Accountable> cache;
-        private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
+        private final Listener[] listeners;
 
-        IndexFieldCache(ESLogger logger,final Cache<Key, Accountable> cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) {
+        IndexFieldCache(ESLogger logger,final Cache<Key, Accountable> cache, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Listener... listeners) {
             this.logger = logger;
-            this.indexService = indexService;
+            this.listeners = listeners;
             this.index = index;
             this.fieldNames = fieldNames;
             this.fieldDataType = fieldDataType;
             this.cache = cache;
-            this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
-            assert indexService != null;
         }
 
         @Override
         public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(final LeafReaderContext context, final IFD indexFieldData) throws Exception {
-            final Key key = new Key(this, context.reader().getCoreCacheKey());
+            final ShardId shardId = ShardUtils.extractShardId(context.reader());
+            final Key key = new Key(this, context.reader().getCoreCacheKey(), shardId);
             //noinspection unchecked
-            final Accountable accountable = cache.get(key, new Callable<AtomicFieldData>() {
-                @Override
-                public AtomicFieldData call() throws Exception {
-                    context.reader().addCoreClosedListener(IndexFieldCache.this);
-
-                    key.listeners.add(indicesFieldDataCacheListener);
-                    final ShardId shardId = ShardUtils.extractShardId(context.reader());
-                    if (shardId != null) {
-                        final IndexShard shard = indexService.shard(shardId.id());
-                        if (shard != null) {
-                            key.listeners.add(shard.fieldData());
-                        }
-                    }
-                    final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
-                    for (Listener listener : key.listeners) {
-                        try {
-                            listener.onLoad(fieldNames, fieldDataType, fieldData);
-                        } catch (Throwable e) {
-                            // load anyway since listeners should not throw exceptions
-                            logger.error("Failed to call listener on atomic field data loading", e);
-                        }
+            final Accountable accountable = cache.get(key, () -> {
+                context.reader().addCoreClosedListener(IndexFieldCache.this);
+                for (Listener listener : this.listeners) {
+                    key.listeners.add(listener);
+                }
+                final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
+                for (Listener listener : key.listeners) {
+                    try {
+                        listener.onCache(shardId, fieldNames, fieldDataType, fieldData);
+                    } catch (Throwable e) {
+                        // load anyway since listeners should not throw exceptions
+                        logger.error("Failed to call listener on atomic field data loading", e);
                     }
-                    return fieldData;
                 }
+                return fieldData;
             });
             return (FD) accountable;
         }
 
         @Override
         public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception {
-            final Key key = new Key(this, indexReader.getCoreCacheKey());
+            final ShardId shardId = ShardUtils.extractShardId(indexReader);
+            final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
             //noinspection unchecked
-            final Accountable accountable = cache.get(key, new Callable<Accountable>() {
-                @Override
-                public Accountable call() throws Exception {
-                    indexReader.addReaderClosedListener(IndexFieldCache.this);
-                    key.listeners.add(indicesFieldDataCacheListener);
-                    final ShardId shardId = ShardUtils.extractShardId(indexReader);
-                    if (shardId != null) {
-                        IndexShard shard = indexService.shard(shardId.id());
-                        if (shard != null) {
-                            key.listeners.add(shard.fieldData());
-                        }
-                    }
-                    final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
-                    for (Listener listener : key.listeners) {
-                        try {
-                            listener.onLoad(fieldNames, fieldDataType, ifd);
-                        } catch (Throwable e) {
-                            // load anyway since listeners should not throw exceptions
-                            logger.error("Failed to call listener on global ordinals loading", e);
-                        }
+            final Accountable accountable = cache.get(key, () -> {
+                indexReader.addReaderClosedListener(IndexFieldCache.this);
+                for (Listener listener : this.listeners) {
+                    key.listeners.add(listener);
+                }
+                final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
+                for (Listener listener : key.listeners) {
+                    try {
+                        listener.onCache(shardId, fieldNames, fieldDataType, ifd);
+                    } catch (Throwable e) {
+                        // load anyway since listeners should not throw exceptions
+                        logger.error("Failed to call listener on global ordinals loading", e);
                     }
-                    return ifd;
                 }
+                return ifd;
             });
             return (IFD) accountable;
         }
 
         @Override
         public void onClose(Object coreKey) {
-            cache.invalidate(new Key(this, coreKey));
+            cache.invalidate(new Key(this, coreKey, null));
             // don't call cache.cleanUp here as it would have bad performance implications
         }
 
         @Override
         public void onClose(IndexReader reader) {
-            cache.invalidate(new Key(this, reader.getCoreCacheKey()));
+            cache.invalidate(new Key(this, reader.getCoreCacheKey(), null));
             // don't call cache.cleanUp here as it would have bad performance implications
         }
 
@@ -263,8 +242,8 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
         }
 
         @Override
-        public void clear(Object coreCacheKey) {
-            cache.invalidate(new Key(this, coreCacheKey));
+        public void clear(IndexReader indexReader) {
+            cache.invalidate(new Key(this, indexReader.getCoreCacheKey(), null));
             // don't call cache.cleanUp here as it would have bad performance implications
         }
     }
@@ -272,13 +251,14 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
     public static class Key {
         public final IndexFieldCache indexCache;
         public final Object readerKey;
+        public final ShardId shardId;
 
         public final List<IndexFieldDataCache.Listener> listeners = new ArrayList<>();
 
-
-        Key(IndexFieldCache indexCache, Object readerKey) {
+        Key(IndexFieldCache indexCache, Object readerKey, @Nullable ShardId shardId) {
             this.indexCache = indexCache;
             this.readerKey = readerKey;
+            this.shardId = shardId;
         }
 
         @Override

+ 4 - 2
core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.indices.fielddata.cache;
 
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.util.Accountable;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.inject.Inject;
@@ -26,6 +27,7 @@ import org.elasticsearch.index.fielddata.FieldDataType;
 import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.mapper.FieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 
 /**
@@ -44,11 +46,11 @@ public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listen
     }
 
     @Override
-    public void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable fieldData) {
+    public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable fieldData) {
     }
 
     @Override
-    public void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
+    public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
         assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]";
         circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
     }

+ 1 - 7
core/src/test/java/org/elasticsearch/bootstrap/BootstrapForTesting.java

@@ -61,13 +61,7 @@ public class BootstrapForTesting {
         try {
             JarHell.checkJarHell();
         } catch (Exception e) {
-            if (Boolean.parseBoolean(System.getProperty("tests.maven"))) {
-                throw new RuntimeException("found jar hell in test classpath", e);
-            } else {
-                Loggers.getLogger(BootstrapForTesting.class)
-                    .warn("Your ide or custom test runner has jar hell issues, " +
-                          "you might want to look into that", e);
-            }
+            throw new RuntimeException("found jar hell in test classpath", e);
         }
 
         // make sure java.io.tmpdir exists always (in case code uses it in a static initializer)

+ 98 - 0
core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
+import org.elasticsearch.test.ESAllocationTestCase;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
+import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+
+public class AllocationPriorityTests extends ESAllocationTestCase {
+
+    /**
+     * Tests that higher prioritized primaries and replicas are allocated first even on the balanced shard allocator
+     * See https://github.com/elastic/elasticsearch/issues/13249 for details
+     */
+    public void testPrioritizedIndicesAllocatedFirst() {
+        AllocationService allocation = createAllocationService(settingsBuilder().
+                put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1)
+                .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 1)
+                .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 1).build());
+        final String highPriorityName;
+        final String lowPriorityName;
+        final int priorityFirst;
+        final int prioritySecond;
+        if (randomBoolean()) {
+            highPriorityName = "first";
+            lowPriorityName = "second";
+            prioritySecond = 1;
+            priorityFirst = 100;
+        } else {
+            lowPriorityName = "first";
+            highPriorityName = "second";
+            prioritySecond = 100;
+            priorityFirst = 1;
+        }
+        MetaData metaData = MetaData.builder()
+                .put(IndexMetaData.builder("first").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, priorityFirst)).numberOfShards(2).numberOfReplicas(1))
+                .put(IndexMetaData.builder("second").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, prioritySecond)).numberOfShards(2).numberOfReplicas(1))
+                .build();
+        RoutingTable routingTable = RoutingTable.builder()
+                .addAsNew(metaData.index("first"))
+                .addAsNew(metaData.index("second"))
+                .build();
+        ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
+        RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
+        clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+
+        routingTable = allocation.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
+        assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
+        assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
+
+        routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
+        assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
+        assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
+
+        routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
+        assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
+        assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
+
+        routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
+        assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
+        assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
+
+    }
+}

+ 94 - 0
core/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java

@@ -34,14 +34,21 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.QueryWrapperFilter;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.join.BitSetProducer;
+import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.BitSet;
+import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESTestCase;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.hamcrest.Matchers.equalTo;
 
@@ -109,4 +116,91 @@ public class BitSetFilterCacheTests extends ESTestCase {
         assertThat(cache.getLoadedFilters().size(), equalTo(0l));
     }
 
+    public void testListener() throws IOException {
+        IndexWriter writer = new IndexWriter(
+                new RAMDirectory(),
+                new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy())
+        );
+        Document document = new Document();
+        document.add(new StringField("field", "value", Field.Store.NO));
+        writer.addDocument(document);
+        writer.commit();
+        final DirectoryReader writerReader = DirectoryReader.open(writer, false);
+        final IndexReader reader = randomBoolean() ? writerReader : ElasticsearchDirectoryReader.wrap(writerReader, new ShardId("test", 0));
+
+        final AtomicLong stats = new AtomicLong();
+        final AtomicInteger onCacheCalls = new AtomicInteger();
+        final AtomicInteger onRemoveCalls = new AtomicInteger();
+
+        final BitsetFilterCache cache = new BitsetFilterCache(new Index("test"), Settings.EMPTY);
+        cache.setListener(new BitsetFilterCache.Listener() {
+            @Override
+            public void onCache(ShardId shardId, Accountable accountable) {
+                onCacheCalls.incrementAndGet();
+                stats.addAndGet(accountable.ramBytesUsed());
+                if (writerReader != reader) {
+                    assertNotNull(shardId);
+                    assertEquals("test", shardId.index().name());
+                    assertEquals(0, shardId.id());
+                } else {
+                    assertNull(shardId);
+                }
+            }
+
+            @Override
+            public void onRemoval(ShardId shardId, Accountable accountable) {
+                onRemoveCalls.incrementAndGet();
+                stats.addAndGet(-accountable.ramBytesUsed());
+                if (writerReader != reader) {
+                    assertNotNull(shardId);
+                    assertEquals("test", shardId.index().name());
+                    assertEquals(0, shardId.id());
+                } else {
+                    assertNull(shardId);
+                }
+            }
+        });
+        BitSetProducer filter = cache.getBitSetProducer(new QueryWrapperFilter(new TermQuery(new Term("field", "value"))));
+        assertThat(matchCount(filter, reader), equalTo(1));
+        assertTrue(stats.get() > 0);
+        assertEquals(1, onCacheCalls.get());
+        assertEquals(0, onRemoveCalls.get());
+        IOUtils.close(reader, writer);
+        assertEquals(1, onRemoveCalls.get());
+        assertEquals(0, stats.get());
+    }
+
+    public void testSetListenerTwice() {
+        final BitsetFilterCache cache = new BitsetFilterCache(new Index("test"), Settings.EMPTY);
+        cache.setListener(new BitsetFilterCache.Listener() {
+
+            @Override
+            public void onCache(ShardId shardId, Accountable accountable) {
+
+            }
+
+            @Override
+            public void onRemoval(ShardId shardId, Accountable accountable) {
+
+            }
+        });
+        try {
+            cache.setListener(new BitsetFilterCache.Listener() {
+
+                @Override
+                public void onCache(ShardId shardId, Accountable accountable) {
+
+                }
+
+                @Override
+                public void onRemoval(ShardId shardId, Accountable accountable) {
+
+                }
+            });
+            fail("can't set it twice");
+        } catch (IllegalStateException ex) {
+            // all is well
+        }
+    }
+
 }

+ 41 - 2
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Base64;
@@ -94,6 +95,7 @@ import java.nio.file.Path;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
@@ -1000,8 +1002,7 @@ public class InternalEngineTests extends ESTestCase {
                                 indexed.countDown();
                                 try {
                                     engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean());
-                                } catch (ForceMergeFailedEngineException ex) {
-                                    // ok
+                                } catch (IOException e) {
                                     return;
                                 }
                             }
@@ -2019,4 +2020,42 @@ public class InternalEngineTests extends ESTestCase {
             assertThat(topDocs.totalHits, equalTo(numDocs));
         }
     }
+
+    public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
+        String operation = randomFrom("optimize", "refresh", "flush");
+        Thread mergeThread = new Thread() {
+            @Override
+            public void run() {
+                boolean stop = false;
+                logger.info("try with {}", operation);
+                while (stop == false) {
+                    try {
+                        switch (operation) {
+                            case "optimize": {
+                                engine.forceMerge(true, 1, false, false, false);
+                                break;
+                            }
+                            case "refresh": {
+                                engine.refresh("test refresh");
+                                break;
+                            }
+                            case "flush": {
+                                engine.flush(true, false);
+                                break;
+                            }
+                        }
+                    } catch (Throwable t) {
+                        throwable.set(t);
+                        stop = true;
+                    }
+                }
+            }
+        };
+        mergeThread.start();
+        engine.close();
+        mergeThread.join();
+        logger.info("exception caught: ", throwable.get());
+        assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get()));
+    }
 }

+ 1 - 2
core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.index.fielddata;
 
-import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.*;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.Strings;
@@ -273,5 +273,4 @@ public abstract class AbstractFieldDataImplTestCase extends AbstractFieldDataTes
     }
 
     protected abstract void fillExtendedMvSet() throws Exception;
-
 }

+ 3 - 6
core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java

@@ -54,7 +54,6 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
     protected LeafReaderContext readerContext;
     protected IndexReader topLevelReader;
     protected IndicesFieldDataCache indicesFieldDataCache;
-
     protected abstract FieldDataType getFieldDataType();
 
     protected boolean hasDocValues() {
@@ -109,11 +108,12 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
         writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy()));
     }
 
-    protected LeafReaderContext refreshReader() throws Exception {
+    protected final LeafReaderContext refreshReader() throws Exception {
         if (readerContext != null) {
             readerContext.reader().close();
         }
-        LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader = DirectoryReader.open(writer, true));
+        topLevelReader = DirectoryReader.open(writer, true);
+        LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader);
         readerContext = reader.getContext();
         return readerContext;
     }
@@ -150,8 +150,5 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
             }
             previous = current;
         }
-
-
     }
-
 }

+ 1 - 5
core/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java

@@ -26,11 +26,7 @@ import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.index.RandomAccessOrds;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.index.*;
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.FilteredQuery;
 import org.apache.lucene.search.IndexSearcher;

+ 80 - 0
core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java

@@ -25,6 +25,9 @@ import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.*;
 import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Accountable;
+import org.elasticsearch.common.lucene.index.ESDirectoryReaderTests;
+import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.fielddata.plain.*;
 import org.elasticsearch.index.mapper.ContentPath;
@@ -33,12 +36,16 @@ import org.elasticsearch.index.mapper.Mapper.BuilderContext;
 import org.elasticsearch.index.mapper.MapperBuilders;
 import org.elasticsearch.index.mapper.core.*;
 import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.instanceOf;
 
@@ -158,4 +165,77 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
         writer.getDirectory().close();
     }
 
+    public void testFieldDataCacheListener() throws Exception {
+        final IndexService indexService = createIndex("test");
+        IndexFieldDataService shardPrivateService = indexService.fieldData();
+        // copy the ifdService since we can set the listener only once.
+        final IndexFieldDataService ifdService = new IndexFieldDataService(shardPrivateService.index(), shardPrivateService.indexSettings(),
+                getInstanceFromNode(IndicesFieldDataCache.class), getInstanceFromNode(CircuitBreakerService.class), indexService.mapperService());
+
+        final BuilderContext ctx = new BuilderContext(indexService.settingsService().getSettings(), new ContentPath(1));
+        final MappedFieldType mapper1 = MapperBuilders.stringField("s").tokenized(false).docValues(true).fieldDataSettings(Settings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx).fieldType();
+        final IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new KeywordAnalyzer()));
+        Document doc = new Document();
+        doc.add(new StringField("s", "thisisastring", Store.NO));
+        writer.addDocument(doc);
+        DirectoryReader open = DirectoryReader.open(writer, true);
+        final boolean wrap = randomBoolean();
+        final IndexReader reader = wrap ? ElasticsearchDirectoryReader.wrap(open, new ShardId("test", 1)) : open;
+        final AtomicInteger onCacheCalled = new AtomicInteger();
+        final AtomicInteger onRemovalCalled = new AtomicInteger();
+        ifdService.setListener(new IndexFieldDataCache.Listener() {
+            @Override
+            public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
+                if (wrap) {
+                    assertEquals(new ShardId("test", 1), shardId);
+                } else {
+                    assertNull(shardId);
+                }
+                onCacheCalled.incrementAndGet();
+            }
+
+            @Override
+            public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
+                if (wrap) {
+                    assertEquals(new ShardId("test", 1), shardId);
+                } else {
+                    assertNull(shardId);
+                }
+                onRemovalCalled.incrementAndGet();
+            }
+        });
+        IndexFieldData<?> ifd = ifdService.getForField(mapper1);
+        LeafReaderContext leafReaderContext = reader.getContext().leaves().get(0);
+        AtomicFieldData load = ifd.load(leafReaderContext);
+        assertEquals(1, onCacheCalled.get());
+        assertEquals(0, onRemovalCalled.get());
+        reader.close();
+        load.close();
+        writer.close();
+        assertEquals(1, onCacheCalled.get());
+        assertEquals(1, onRemovalCalled.get());
+        ifdService.clear();
+    }
+
+    public void testSetCacheListenerTwice() {
+        final IndexService indexService = createIndex("test");
+        IndexFieldDataService shardPrivateService = indexService.fieldData();
+        try {
+            shardPrivateService.setListener(new IndexFieldDataCache.Listener() {
+                @Override
+                public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
+
+                }
+
+                @Override
+                public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
+
+                }
+            });
+            fail("listener already set");
+        } catch (IllegalStateException ex) {
+            // all well
+        }
+    }
+
 }

+ 1 - 10
core/src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java

@@ -25,16 +25,7 @@ import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.LeafReader;
-import org.apache.lucene.index.PostingsEnum;
-import org.apache.lucene.index.RandomIndexWriter;
-import org.apache.lucene.index.SlowCompositeReaderWrapper;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.Terms;
-import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.index.*;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.IndexSearcher;

+ 2 - 1
core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java

@@ -38,6 +38,7 @@ import org.elasticsearch.common.lucene.search.Queries;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
 import org.elasticsearch.index.mapper.internal.UidFieldMapper;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.BucketCollector;
@@ -113,7 +114,7 @@ public class NestedAggregatorTests extends ESSingleNodeTestCase {
         indexWriter.commit();
         indexWriter.close();
 
-        DirectoryReader directoryReader = DirectoryReader.open(directory);
+        DirectoryReader directoryReader =  DirectoryReader.open(directory);
         IndexSearcher searcher = new IndexSearcher(directoryReader);
 
         IndexService indexService = createIndex("test");

+ 130 - 0
dev-tools/validate-maven-repository.py

@@ -0,0 +1,130 @@
+# Licensed to Elasticsearch under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Elasticsearch licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance  with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on
+# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+# either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+# Helper python script to check if a sonatype staging repo contains
+# all the required files compared to a local repository
+#
+# The script does the following steps
+#
+# 1. Scans the local maven repo for all files in /org/elasticsearch
+# 2. Opens a HTTP connection to the staging repo
+# 3. Executes a HEAD request for each file found in step one
+# 4. Compares the content-length response header with the real file size
+# 5. Return an error if those two numbers differ
+#
+# A pre requirement to run this, is to find out via the oss.sonatype.org web UI, how that repo is named
+# - After logging in you go to 'Staging repositories' and search for the one you just created
+# - Click into the `Content` tab
+# - Open any artifact (not a directory)
+# - Copy the link of `Repository Path` on the right and reuse that part of the URL
+#
+# Alternatively you can just use the name of the repository and reuse the rest (ie. the repository
+# named for the example below would have been named orgelasticsearch-1012)
+#
+#
+# Example call
+#   python dev-tools/validate-maven-repository.py /path/to/repo/org/elasticsearch/ \
+#          https://oss.sonatype.org/service/local/repositories/orgelasticsearch-1012/content/org/elasticsearch
+
+import sys
+import os
+import httplib
+import urlparse
+import re
+
+# Draw a simple progress bar, a couple of hundred HEAD requests might take a while
+# Note, when drawing this, it uses the carriage return character, so you should not
+# write anything in between
+def drawProgressBar(percent, barLen = 40):
+    sys.stdout.write("\r")
+    progress = ""
+    for i in range(barLen):
+        if i < int(barLen * percent):
+            progress += "="
+        else:
+            progress += " "
+    sys.stdout.write("[ %s ] %.2f%%" % (progress, percent * 100))
+    sys.stdout.flush()
+
+if __name__ == "__main__":
+  if len(sys.argv) != 3:
+    print 'Usage: %s <localRep> <stagingRepo> [user:pass]' % (sys.argv[0])
+    print ''
+    print 'Example: %s /tmp/my-maven-repo/org/elasticsearch https://oss.sonatype.org/service/local/repositories/orgelasticsearch-1012/content/org/elasticsearch' % (sys.argv[0])
+  else:
+    sys.argv[1] = re.sub('/$', '', sys.argv[1])
+    sys.argv[2] = re.sub('/$', '', sys.argv[2])
+
+    localMavenRepo = sys.argv[1]
+    endpoint = sys.argv[2]
+
+    filesToCheck = []
+    foundSignedFiles = False
+
+    for root, dirs, files in os.walk(localMavenRepo):
+      for file in files:
+        # no metadata files (they get renamed from maven-metadata-local.xml to maven-metadata.xml while deploying)
+        # no .properties and .repositories files (they dont get uploaded)
+        if not file.startswith('maven-metadata') and not file.endswith('.properties') and not file.endswith('.repositories'):
+          filesToCheck.append(os.path.join(root, file))
+        if file.endswith('.asc'):
+          foundSignedFiles = True
+
+    print "Need to check %i files" % len(filesToCheck)
+    if not foundSignedFiles:
+      print '### Warning: No signed .asc files found'
+
+    # set up http
+    parsed_uri = urlparse.urlparse(endpoint)
+    domain = parsed_uri.netloc
+    if parsed_uri.scheme == 'https':
+      conn = httplib.HTTPSConnection(domain)
+    else:
+      conn = httplib.HTTPConnection(domain)
+    #conn.set_debuglevel(5)
+
+    drawProgressBar(0)
+    errors = []
+    for idx, file in enumerate(filesToCheck):
+      request_uri = parsed_uri.path + file[len(localMavenRepo):]
+      conn.request("HEAD", request_uri)
+      res = conn.getresponse()
+      res.read() # useless call for head, but prevents httplib.ResponseNotReady raise
+
+      absolute_url = parsed_uri.scheme + '://' + parsed_uri.netloc + request_uri
+      if res.status == 200:
+        content_length = res.getheader('content-length')
+        local_file_size = os.path.getsize(file)
+        if int(content_length) != int(local_file_size):
+          errors.append('LENGTH MISMATCH:   %s differs in size. local %s <=> %s remote' % (absolute_url, content_length, local_file_size))
+      elif res.status == 404:
+        errors.append('MISSING:           %s' % absolute_url)
+      elif res.status == 301 or res.status == 302:
+        errors.append('REDIRECT:          %s to %s' % (absolute_url, res.getheader('location')))
+      else:
+        errors.append('ERROR:             %s http response: %s %s' %(absolute_url, res.status, res.reason))
+
+      # update progressbar at the end
+      drawProgressBar((idx+1)/float(len(filesToCheck)))
+
+    print
+
+    if len(errors) != 0:
+      print 'The following errors occured (%s out of %s files)' % (len(errors), len(filesToCheck))
+      print
+      for error in errors:
+        print error
+      sys.exit(-1)