| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- /*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- package org.elasticsearch.common.util;
- import org.apache.lucene.util.RamUsageEstimator;
- import org.elasticsearch.common.lease.Releasable;
- import org.elasticsearch.common.lease.Releasables;
- import org.elasticsearch.common.recycler.AbstractRecyclerC;
- import org.elasticsearch.common.recycler.Recycler;
- import org.elasticsearch.common.settings.Setting;
- import org.elasticsearch.common.settings.Setting.Property;
- import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.unit.ByteSizeValue;
- import org.elasticsearch.common.util.concurrent.EsExecutors;
- import java.util.Arrays;
- import java.util.Locale;
- import static org.elasticsearch.common.recycler.Recyclers.concurrent;
- import static org.elasticsearch.common.recycler.Recyclers.concurrentDeque;
- import static org.elasticsearch.common.recycler.Recyclers.dequeFactory;
- import static org.elasticsearch.common.recycler.Recyclers.none;
- /** A recycler of fixed-size pages. */
- public class PageCacheRecycler implements Releasable {
- public static final Setting<Type> TYPE_SETTING =
- new Setting<>("cache.recycler.page.type", Type.CONCURRENT.name(), Type::parse, Property.NodeScope);
- public static final Setting<ByteSizeValue> LIMIT_HEAP_SETTING =
- Setting.memorySizeSetting("cache.recycler.page.limit.heap", "10%", Property.NodeScope);
- public static final Setting<Double> WEIGHT_BYTES_SETTING =
- Setting.doubleSetting("cache.recycler.page.weight.bytes", 1d, 0d, Property.NodeScope);
- public static final Setting<Double> WEIGHT_LONG_SETTING =
- Setting.doubleSetting("cache.recycler.page.weight.longs", 1d, 0d, Property.NodeScope);
- public static final Setting<Double> WEIGHT_INT_SETTING =
- Setting.doubleSetting("cache.recycler.page.weight.ints", 1d, 0d, Property.NodeScope);
- // object pages are less useful to us so we give them a lower weight by default
- public static final Setting<Double> WEIGHT_OBJECTS_SETTING =
- Setting.doubleSetting("cache.recycler.page.weight.objects", 0.1d, 0d, Property.NodeScope);
- /** Page size in bytes: 16KB */
- public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
- public static final int OBJECT_PAGE_SIZE = PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_OBJECT_REF;
- public static final int LONG_PAGE_SIZE = PAGE_SIZE_IN_BYTES / Long.BYTES;
- public static final int INT_PAGE_SIZE = PAGE_SIZE_IN_BYTES / Integer.BYTES;
- public static final int BYTE_PAGE_SIZE = PAGE_SIZE_IN_BYTES;
- private final Recycler<byte[]> bytePage;
- private final Recycler<int[]> intPage;
- private final Recycler<long[]> longPage;
- private final Recycler<Object[]> objectPage;
- public static final PageCacheRecycler NON_RECYCLING_INSTANCE;
- static {
- NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build());
- }
- @Override
- public void close() {
- Releasables.close(true, bytePage, intPage, longPage, objectPage);
- }
- public PageCacheRecycler(Settings settings) {
- final Type type = TYPE_SETTING.get(settings);
- final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
- final int availableProcessors = EsExecutors.numberOfProcessors(settings);
- // We have a global amount of memory that we need to divide across data types.
- // Since some types are more useful than other ones we give them different weights.
- // Trying to store all of them in a single stack would be problematic because eg.
- // a work load could fill the recycler with only byte[] pages and then another
- // workload that would work with double[] pages couldn't recycle them because there
- // is no space left in the stack/queue. LRU/LFU policies are not an option either
- // because they would make obtain/release too costly: we really need constant-time
- // operations.
- // Ultimately a better solution would be to only store one kind of data and have the
- // ability to interpret it either as a source of bytes, doubles, longs, etc. eg. thanks
- // to direct ByteBuffers or sun.misc.Unsafe on a byte[] but this would have other issues
- // that would need to be addressed such as garbage collection of native memory or safety
- // of Unsafe writes.
- final double bytesWeight = WEIGHT_BYTES_SETTING .get(settings);
- final double intsWeight = WEIGHT_INT_SETTING .get(settings);
- final double longsWeight = WEIGHT_LONG_SETTING .get(settings);
- final double objectsWeight = WEIGHT_OBJECTS_SETTING .get(settings);
- final double totalWeight = bytesWeight + intsWeight + longsWeight + objectsWeight;
- final int maxPageCount = (int) Math.min(Integer.MAX_VALUE, limit / PAGE_SIZE_IN_BYTES);
- final int maxBytePageCount = (int) (bytesWeight * maxPageCount / totalWeight);
- bytePage = build(type, maxBytePageCount, availableProcessors, new AbstractRecyclerC<byte[]>() {
- @Override
- public byte[] newInstance(int sizing) {
- return new byte[BYTE_PAGE_SIZE];
- }
- @Override
- public void recycle(byte[] value) {
- // nothing to do
- }
- });
- final int maxIntPageCount = (int) (intsWeight * maxPageCount / totalWeight);
- intPage = build(type, maxIntPageCount, availableProcessors, new AbstractRecyclerC<int[]>() {
- @Override
- public int[] newInstance(int sizing) {
- return new int[INT_PAGE_SIZE];
- }
- @Override
- public void recycle(int[] value) {
- // nothing to do
- }
- });
- final int maxLongPageCount = (int) (longsWeight * maxPageCount / totalWeight);
- longPage = build(type, maxLongPageCount, availableProcessors, new AbstractRecyclerC<long[]>() {
- @Override
- public long[] newInstance(int sizing) {
- return new long[LONG_PAGE_SIZE];
- }
- @Override
- public void recycle(long[] value) {
- // nothing to do
- }
- });
- final int maxObjectPageCount = (int) (objectsWeight * maxPageCount / totalWeight);
- objectPage = build(type, maxObjectPageCount, availableProcessors, new AbstractRecyclerC<Object[]>() {
- @Override
- public Object[] newInstance(int sizing) {
- return new Object[OBJECT_PAGE_SIZE];
- }
- @Override
- public void recycle(Object[] value) {
- Arrays.fill(value, null); // we need to remove the strong refs on the objects stored in the array
- }
- });
- assert PAGE_SIZE_IN_BYTES * (maxBytePageCount + maxIntPageCount + maxLongPageCount + maxObjectPageCount) <= limit;
- }
- public Recycler.V<byte[]> bytePage(boolean clear) {
- final Recycler.V<byte[]> v = bytePage.obtain();
- if (v.isRecycled() && clear) {
- Arrays.fill(v.v(), (byte) 0);
- }
- return v;
- }
- public Recycler.V<int[]> intPage(boolean clear) {
- final Recycler.V<int[]> v = intPage.obtain();
- if (v.isRecycled() && clear) {
- Arrays.fill(v.v(), 0);
- }
- return v;
- }
- public Recycler.V<long[]> longPage(boolean clear) {
- final Recycler.V<long[]> v = longPage.obtain();
- if (v.isRecycled() && clear) {
- Arrays.fill(v.v(), 0L);
- }
- return v;
- }
- public Recycler.V<Object[]> objectPage() {
- // object pages are cleared on release anyway
- return objectPage.obtain();
- }
- private static <T> Recycler<T> build(Type type, int limit, int availableProcessors, Recycler.C<T> c) {
- final Recycler<T> recycler;
- if (limit == 0) {
- recycler = none(c);
- } else {
- recycler = type.build(c, limit, availableProcessors);
- }
- return recycler;
- }
- public enum Type {
- QUEUE {
- @Override
- <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
- return concurrentDeque(c, limit);
- }
- },
- CONCURRENT {
- @Override
- <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
- return concurrent(dequeFactory(c, limit / availableProcessors), availableProcessors);
- }
- },
- NONE {
- @Override
- <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
- return none(c);
- }
- };
- public static Type parse(String type) {
- try {
- return Type.valueOf(type.toUpperCase(Locale.ROOT));
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("no type support [" + type + "]");
- }
- }
- abstract <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors);
- }
- }
|