PageCacheRecycler.java 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.common.util;
  20. import org.apache.lucene.util.RamUsageEstimator;
  21. import org.elasticsearch.common.lease.Releasable;
  22. import org.elasticsearch.common.lease.Releasables;
  23. import org.elasticsearch.common.recycler.AbstractRecyclerC;
  24. import org.elasticsearch.common.recycler.Recycler;
  25. import org.elasticsearch.common.settings.Setting;
  26. import org.elasticsearch.common.settings.Setting.Property;
  27. import org.elasticsearch.common.settings.Settings;
  28. import org.elasticsearch.common.unit.ByteSizeValue;
  29. import org.elasticsearch.common.util.concurrent.EsExecutors;
  30. import java.util.Arrays;
  31. import java.util.Locale;
  32. import static org.elasticsearch.common.recycler.Recyclers.concurrent;
  33. import static org.elasticsearch.common.recycler.Recyclers.concurrentDeque;
  34. import static org.elasticsearch.common.recycler.Recyclers.dequeFactory;
  35. import static org.elasticsearch.common.recycler.Recyclers.none;
  36. /** A recycler of fixed-size pages. */
  37. public class PageCacheRecycler implements Releasable {
  38. public static final Setting<Type> TYPE_SETTING =
  39. new Setting<>("cache.recycler.page.type", Type.CONCURRENT.name(), Type::parse, Property.NodeScope);
  40. public static final Setting<ByteSizeValue> LIMIT_HEAP_SETTING =
  41. Setting.memorySizeSetting("cache.recycler.page.limit.heap", "10%", Property.NodeScope);
  42. public static final Setting<Double> WEIGHT_BYTES_SETTING =
  43. Setting.doubleSetting("cache.recycler.page.weight.bytes", 1d, 0d, Property.NodeScope);
  44. public static final Setting<Double> WEIGHT_LONG_SETTING =
  45. Setting.doubleSetting("cache.recycler.page.weight.longs", 1d, 0d, Property.NodeScope);
  46. public static final Setting<Double> WEIGHT_INT_SETTING =
  47. Setting.doubleSetting("cache.recycler.page.weight.ints", 1d, 0d, Property.NodeScope);
  48. // object pages are less useful to us so we give them a lower weight by default
  49. public static final Setting<Double> WEIGHT_OBJECTS_SETTING =
  50. Setting.doubleSetting("cache.recycler.page.weight.objects", 0.1d, 0d, Property.NodeScope);
  51. /** Page size in bytes: 16KB */
  52. public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
  53. public static final int OBJECT_PAGE_SIZE = PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_OBJECT_REF;
  54. public static final int LONG_PAGE_SIZE = PAGE_SIZE_IN_BYTES / Long.BYTES;
  55. public static final int INT_PAGE_SIZE = PAGE_SIZE_IN_BYTES / Integer.BYTES;
  56. public static final int BYTE_PAGE_SIZE = PAGE_SIZE_IN_BYTES;
  57. private final Recycler<byte[]> bytePage;
  58. private final Recycler<int[]> intPage;
  59. private final Recycler<long[]> longPage;
  60. private final Recycler<Object[]> objectPage;
  61. public static final PageCacheRecycler NON_RECYCLING_INSTANCE;
  62. static {
  63. NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build());
  64. }
  65. @Override
  66. public void close() {
  67. Releasables.close(true, bytePage, intPage, longPage, objectPage);
  68. }
  69. public PageCacheRecycler(Settings settings) {
  70. final Type type = TYPE_SETTING.get(settings);
  71. final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
  72. final int availableProcessors = EsExecutors.numberOfProcessors(settings);
  73. // We have a global amount of memory that we need to divide across data types.
  74. // Since some types are more useful than other ones we give them different weights.
  75. // Trying to store all of them in a single stack would be problematic because eg.
  76. // a work load could fill the recycler with only byte[] pages and then another
  77. // workload that would work with double[] pages couldn't recycle them because there
  78. // is no space left in the stack/queue. LRU/LFU policies are not an option either
  79. // because they would make obtain/release too costly: we really need constant-time
  80. // operations.
  81. // Ultimately a better solution would be to only store one kind of data and have the
  82. // ability to interpret it either as a source of bytes, doubles, longs, etc. eg. thanks
  83. // to direct ByteBuffers or sun.misc.Unsafe on a byte[] but this would have other issues
  84. // that would need to be addressed such as garbage collection of native memory or safety
  85. // of Unsafe writes.
  86. final double bytesWeight = WEIGHT_BYTES_SETTING .get(settings);
  87. final double intsWeight = WEIGHT_INT_SETTING .get(settings);
  88. final double longsWeight = WEIGHT_LONG_SETTING .get(settings);
  89. final double objectsWeight = WEIGHT_OBJECTS_SETTING .get(settings);
  90. final double totalWeight = bytesWeight + intsWeight + longsWeight + objectsWeight;
  91. final int maxPageCount = (int) Math.min(Integer.MAX_VALUE, limit / PAGE_SIZE_IN_BYTES);
  92. final int maxBytePageCount = (int) (bytesWeight * maxPageCount / totalWeight);
  93. bytePage = build(type, maxBytePageCount, availableProcessors, new AbstractRecyclerC<byte[]>() {
  94. @Override
  95. public byte[] newInstance(int sizing) {
  96. return new byte[BYTE_PAGE_SIZE];
  97. }
  98. @Override
  99. public void recycle(byte[] value) {
  100. // nothing to do
  101. }
  102. });
  103. final int maxIntPageCount = (int) (intsWeight * maxPageCount / totalWeight);
  104. intPage = build(type, maxIntPageCount, availableProcessors, new AbstractRecyclerC<int[]>() {
  105. @Override
  106. public int[] newInstance(int sizing) {
  107. return new int[INT_PAGE_SIZE];
  108. }
  109. @Override
  110. public void recycle(int[] value) {
  111. // nothing to do
  112. }
  113. });
  114. final int maxLongPageCount = (int) (longsWeight * maxPageCount / totalWeight);
  115. longPage = build(type, maxLongPageCount, availableProcessors, new AbstractRecyclerC<long[]>() {
  116. @Override
  117. public long[] newInstance(int sizing) {
  118. return new long[LONG_PAGE_SIZE];
  119. }
  120. @Override
  121. public void recycle(long[] value) {
  122. // nothing to do
  123. }
  124. });
  125. final int maxObjectPageCount = (int) (objectsWeight * maxPageCount / totalWeight);
  126. objectPage = build(type, maxObjectPageCount, availableProcessors, new AbstractRecyclerC<Object[]>() {
  127. @Override
  128. public Object[] newInstance(int sizing) {
  129. return new Object[OBJECT_PAGE_SIZE];
  130. }
  131. @Override
  132. public void recycle(Object[] value) {
  133. Arrays.fill(value, null); // we need to remove the strong refs on the objects stored in the array
  134. }
  135. });
  136. assert PAGE_SIZE_IN_BYTES * (maxBytePageCount + maxIntPageCount + maxLongPageCount + maxObjectPageCount) <= limit;
  137. }
  138. public Recycler.V<byte[]> bytePage(boolean clear) {
  139. final Recycler.V<byte[]> v = bytePage.obtain();
  140. if (v.isRecycled() && clear) {
  141. Arrays.fill(v.v(), (byte) 0);
  142. }
  143. return v;
  144. }
  145. public Recycler.V<int[]> intPage(boolean clear) {
  146. final Recycler.V<int[]> v = intPage.obtain();
  147. if (v.isRecycled() && clear) {
  148. Arrays.fill(v.v(), 0);
  149. }
  150. return v;
  151. }
  152. public Recycler.V<long[]> longPage(boolean clear) {
  153. final Recycler.V<long[]> v = longPage.obtain();
  154. if (v.isRecycled() && clear) {
  155. Arrays.fill(v.v(), 0L);
  156. }
  157. return v;
  158. }
  159. public Recycler.V<Object[]> objectPage() {
  160. // object pages are cleared on release anyway
  161. return objectPage.obtain();
  162. }
  163. private static <T> Recycler<T> build(Type type, int limit, int availableProcessors, Recycler.C<T> c) {
  164. final Recycler<T> recycler;
  165. if (limit == 0) {
  166. recycler = none(c);
  167. } else {
  168. recycler = type.build(c, limit, availableProcessors);
  169. }
  170. return recycler;
  171. }
  172. public enum Type {
  173. QUEUE {
  174. @Override
  175. <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
  176. return concurrentDeque(c, limit);
  177. }
  178. },
  179. CONCURRENT {
  180. @Override
  181. <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
  182. return concurrent(dequeFactory(c, limit / availableProcessors), availableProcessors);
  183. }
  184. },
  185. NONE {
  186. @Override
  187. <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
  188. return none(c);
  189. }
  190. };
  191. public static Type parse(String type) {
  192. try {
  193. return Type.valueOf(type.toUpperCase(Locale.ROOT));
  194. } catch (IllegalArgumentException e) {
  195. throw new IllegalArgumentException("no type support [" + type + "]");
  196. }
  197. }
  198. abstract <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors);
  199. }
  200. }