InternalEngine.java 112 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.index.engine;
  20. import org.apache.logging.log4j.Logger;
  21. import org.apache.logging.log4j.message.ParameterizedMessage;
  22. import org.apache.lucene.index.DirectoryReader;
  23. import org.apache.lucene.index.IndexCommit;
  24. import org.apache.lucene.index.IndexReader;
  25. import org.apache.lucene.index.IndexWriter;
  26. import org.apache.lucene.index.IndexWriterConfig;
  27. import org.apache.lucene.index.LeafReader;
  28. import org.apache.lucene.index.LiveIndexWriterConfig;
  29. import org.apache.lucene.index.MergePolicy;
  30. import org.apache.lucene.index.SegmentCommitInfo;
  31. import org.apache.lucene.index.SegmentInfos;
  32. import org.apache.lucene.index.Term;
  33. import org.apache.lucene.search.IndexSearcher;
  34. import org.apache.lucene.search.ReferenceManager;
  35. import org.apache.lucene.search.SearcherFactory;
  36. import org.apache.lucene.search.SearcherManager;
  37. import org.apache.lucene.search.TermQuery;
  38. import org.apache.lucene.store.AlreadyClosedException;
  39. import org.apache.lucene.store.Directory;
  40. import org.apache.lucene.store.LockObtainFailedException;
  41. import org.apache.lucene.util.BytesRef;
  42. import org.apache.lucene.util.InfoStream;
  43. import org.elasticsearch.ExceptionsHelper;
  44. import org.elasticsearch.action.index.IndexRequest;
  45. import org.elasticsearch.common.Nullable;
  46. import org.elasticsearch.common.SuppressForbidden;
  47. import org.elasticsearch.common.lease.Releasable;
  48. import org.elasticsearch.common.lease.Releasables;
  49. import org.elasticsearch.common.lucene.LoggerInfoStream;
  50. import org.elasticsearch.common.lucene.Lucene;
  51. import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
  52. import org.elasticsearch.common.lucene.uid.Versions;
  53. import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
  54. import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
  55. import org.elasticsearch.common.metrics.CounterMetric;
  56. import org.elasticsearch.common.util.concurrent.AbstractRunnable;
  57. import org.elasticsearch.common.util.concurrent.ReleasableLock;
  58. import org.elasticsearch.core.internal.io.IOUtils;
  59. import org.elasticsearch.index.IndexSettings;
  60. import org.elasticsearch.index.VersionType;
  61. import org.elasticsearch.index.mapper.IdFieldMapper;
  62. import org.elasticsearch.index.mapper.ParseContext;
  63. import org.elasticsearch.index.merge.MergeStats;
  64. import org.elasticsearch.index.merge.OnGoingMerge;
  65. import org.elasticsearch.index.seqno.LocalCheckpointTracker;
  66. import org.elasticsearch.index.seqno.SequenceNumbers;
  67. import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
  68. import org.elasticsearch.index.shard.ShardId;
  69. import org.elasticsearch.index.translog.Translog;
  70. import org.elasticsearch.index.translog.TranslogConfig;
  71. import org.elasticsearch.index.translog.TranslogCorruptedException;
  72. import org.elasticsearch.index.translog.TranslogDeletionPolicy;
  73. import org.elasticsearch.threadpool.ThreadPool;
  74. import java.io.IOException;
  75. import java.util.Arrays;
  76. import java.util.Collection;
  77. import java.util.HashMap;
  78. import java.util.List;
  79. import java.util.Map;
  80. import java.util.Objects;
  81. import java.util.Optional;
  82. import java.util.Set;
  83. import java.util.concurrent.CountDownLatch;
  84. import java.util.concurrent.atomic.AtomicBoolean;
  85. import java.util.concurrent.atomic.AtomicInteger;
  86. import java.util.concurrent.atomic.AtomicLong;
  87. import java.util.concurrent.locks.Lock;
  88. import java.util.concurrent.locks.ReentrantLock;
  89. import java.util.function.BiFunction;
  90. import java.util.function.LongSupplier;
  91. import java.util.stream.Stream;
  92. public class InternalEngine extends Engine {
  93. /**
  94. * When we last pruned expired tombstones from versionMap.deletes:
  95. */
  96. private volatile long lastDeleteVersionPruneTimeMSec;
  97. private final Translog translog;
  98. private final ElasticsearchConcurrentMergeScheduler mergeScheduler;
  99. private final IndexWriter indexWriter;
  100. private final ExternalSearcherManager externalSearcherManager;
  101. private final SearcherManager internalSearcherManager;
  102. private final Lock flushLock = new ReentrantLock();
  103. private final ReentrantLock optimizeLock = new ReentrantLock();
  104. // A uid (in the form of BytesRef) to the version map
  105. // we use the hashed variant since we iterate over it and check removal and additions on existing keys
  106. private final LiveVersionMap versionMap = new LiveVersionMap();
  107. private volatile SegmentInfos lastCommittedSegmentInfos;
  108. private final IndexThrottle throttle;
  109. private final LocalCheckpointTracker localCheckpointTracker;
  110. private final CombinedDeletionPolicy combinedDeletionPolicy;
  111. // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
  112. // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
  113. // incoming indexing ops to a single thread:
  114. private final AtomicInteger throttleRequestCount = new AtomicInteger();
  115. private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
  116. public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
  117. private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
  118. private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
  119. private final CounterMetric numVersionLookups = new CounterMetric();
  120. private final CounterMetric numIndexVersionsLookups = new CounterMetric();
  121. // Lucene operations since this engine was opened - not include operations from existing segments.
  122. private final CounterMetric numDocDeletes = new CounterMetric();
  123. private final CounterMetric numDocAppends = new CounterMetric();
  124. private final CounterMetric numDocUpdates = new CounterMetric();
  125. /**
  126. * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
  127. * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
  128. * being indexed/deleted.
  129. */
  130. private final AtomicLong writingBytes = new AtomicLong();
  131. private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
  132. @Nullable
  133. private final String historyUUID;
  134. public InternalEngine(EngineConfig engineConfig) {
  135. this(engineConfig, LocalCheckpointTracker::new);
  136. }
  137. InternalEngine(
  138. final EngineConfig engineConfig,
  139. final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
  140. super(engineConfig);
  141. if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
  142. maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
  143. }
  144. final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
  145. engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
  146. engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
  147. );
  148. store.incRef();
  149. IndexWriter writer = null;
  150. Translog translog = null;
  151. ExternalSearcherManager externalSearcherManager = null;
  152. SearcherManager internalSearcherManager = null;
  153. EngineMergeScheduler scheduler = null;
  154. boolean success = false;
  155. try {
  156. this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
  157. mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
  158. throttle = new IndexThrottle();
  159. try {
  160. translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
  161. assert translog.getGeneration() != null;
  162. this.translog = translog;
  163. this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
  164. this.combinedDeletionPolicy =
  165. new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint);
  166. writer = createWriter();
  167. bootstrapAppendOnlyInfoFromWriter(writer);
  168. historyUUID = loadHistoryUUID(writer);
  169. indexWriter = writer;
  170. } catch (IOException | TranslogCorruptedException e) {
  171. throw new EngineCreationFailureException(shardId, "failed to create engine", e);
  172. } catch (AssertionError e) {
  173. // IndexWriter throws AssertionError on init, if asserts are enabled, if any files don't exist, but tests that
  174. // randomly throw FNFE/NSFE can also hit this:
  175. if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
  176. throw new EngineCreationFailureException(shardId, "failed to create engine", e);
  177. } else {
  178. throw e;
  179. }
  180. }
  181. externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig));
  182. internalSearcherManager = externalSearcherManager.internalSearcherManager;
  183. this.internalSearcherManager = internalSearcherManager;
  184. this.externalSearcherManager = externalSearcherManager;
  185. internalSearcherManager.addListener(versionMap);
  186. assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
  187. // don't allow commits until we are done with recovering
  188. pendingTranslogRecovery.set(true);
  189. for (ReferenceManager.RefreshListener listener: engineConfig.getExternalRefreshListener()) {
  190. this.externalSearcherManager.addListener(listener);
  191. }
  192. for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
  193. this.internalSearcherManager.addListener(listener);
  194. }
  195. success = true;
  196. } finally {
  197. if (success == false) {
  198. IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler);
  199. if (isClosed.get() == false) {
  200. // failure we need to dec the store reference
  201. store.decRef();
  202. }
  203. }
  204. }
  205. logger.trace("created new InternalEngine");
  206. }
  207. private LocalCheckpointTracker createLocalCheckpointTracker(
  208. BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
  209. final long maxSeqNo;
  210. final long localCheckpoint;
  211. final SequenceNumbers.CommitInfo seqNoStats =
  212. SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
  213. maxSeqNo = seqNoStats.maxSeqNo;
  214. localCheckpoint = seqNoStats.localCheckpoint;
  215. logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
  216. return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
  217. }
  218. /**
  219. * This reference manager delegates all it's refresh calls to another (internal) SearcherManager
  220. * The main purpose for this is that if we have external refreshes happening we don't issue extra
  221. * refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing
  222. * is happening and the refresh interval is low (ie. 1 sec)
  223. *
  224. * This also prevents segment starvation where an internal reader holds on to old segments literally forever
  225. * since no indexing is happening and refreshes are only happening to the external reader manager, while with
  226. * this specialized implementation an external refresh will immediately be reflected on the internal reader
  227. * and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
  228. */
  229. @SuppressForbidden(reason = "reference counting is required here")
  230. private static final class ExternalSearcherManager extends ReferenceManager<IndexSearcher> {
  231. private final SearcherFactory searcherFactory;
  232. private final SearcherManager internalSearcherManager;
  233. ExternalSearcherManager(SearcherManager internalSearcherManager, SearcherFactory searcherFactory) throws IOException {
  234. IndexSearcher acquire = internalSearcherManager.acquire();
  235. try {
  236. IndexReader indexReader = acquire.getIndexReader();
  237. assert indexReader instanceof ElasticsearchDirectoryReader:
  238. "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader;
  239. indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails
  240. current = SearcherManager.getSearcher(searcherFactory, indexReader, null);
  241. } finally {
  242. internalSearcherManager.release(acquire);
  243. }
  244. this.searcherFactory = searcherFactory;
  245. this.internalSearcherManager = internalSearcherManager;
  246. }
  247. @Override
  248. protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
  249. // we simply run a blocking refresh on the internal reference manager and then steal it's reader
  250. // it's a save operation since we acquire the reader which incs it's reference but then down the road
  251. // steal it by calling incRef on the "stolen" reader
  252. internalSearcherManager.maybeRefreshBlocking();
  253. IndexSearcher acquire = internalSearcherManager.acquire();
  254. try {
  255. final IndexReader previousReader = referenceToRefresh.getIndexReader();
  256. assert previousReader instanceof ElasticsearchDirectoryReader:
  257. "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader;
  258. final IndexReader newReader = acquire.getIndexReader();
  259. if (newReader == previousReader) {
  260. // nothing has changed - both ref managers share the same instance so we can use reference equality
  261. return null;
  262. } else {
  263. newReader.incRef(); // steal the reader - getSearcher will decrement if it fails
  264. return SearcherManager.getSearcher(searcherFactory, newReader, previousReader);
  265. }
  266. } finally {
  267. internalSearcherManager.release(acquire);
  268. }
  269. }
  270. @Override
  271. protected boolean tryIncRef(IndexSearcher reference) {
  272. return reference.getIndexReader().tryIncRef();
  273. }
  274. @Override
  275. protected int getRefCount(IndexSearcher reference) {
  276. return reference.getIndexReader().getRefCount();
  277. }
  278. @Override
  279. protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); }
  280. }
  281. @Override
  282. public void restoreLocalCheckpointFromTranslog() throws IOException {
  283. try (ReleasableLock ignored = writeLock.acquire()) {
  284. ensureOpen();
  285. final long localCheckpoint = localCheckpointTracker.getCheckpoint();
  286. try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
  287. Translog.Operation operation;
  288. while ((operation = snapshot.next()) != null) {
  289. if (operation.seqNo() > localCheckpoint) {
  290. localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
  291. }
  292. }
  293. }
  294. }
  295. }
  296. @Override
  297. public int fillSeqNoGaps(long primaryTerm) throws IOException {
  298. try (ReleasableLock ignored = writeLock.acquire()) {
  299. ensureOpen();
  300. final long localCheckpoint = localCheckpointTracker.getCheckpoint();
  301. final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
  302. int numNoOpsAdded = 0;
  303. for (
  304. long seqNo = localCheckpoint + 1;
  305. seqNo <= maxSeqNo;
  306. seqNo = localCheckpointTracker.getCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
  307. innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
  308. numNoOpsAdded++;
  309. assert seqNo <= localCheckpointTracker.getCheckpoint()
  310. : "local checkpoint did not advance; was [" + seqNo + "], now [" + localCheckpointTracker.getCheckpoint() + "]";
  311. }
  312. return numNoOpsAdded;
  313. }
  314. }
  315. private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
  316. for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
  317. final String key = entry.getKey();
  318. if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
  319. assert maxUnsafeAutoIdTimestamp.get() == -1 :
  320. "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
  321. maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
  322. }
  323. if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
  324. assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
  325. "max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]";
  326. maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue()));
  327. }
  328. }
  329. }
  330. @Override
  331. public InternalEngine recoverFromTranslog() throws IOException {
  332. flushLock.lock();
  333. try (ReleasableLock lock = readLock.acquire()) {
  334. ensureOpen();
  335. if (pendingTranslogRecovery.get() == false) {
  336. throw new IllegalStateException("Engine has already been recovered");
  337. }
  338. try {
  339. recoverFromTranslogInternal();
  340. } catch (Exception e) {
  341. try {
  342. pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
  343. failEngine("failed to recover from translog", e);
  344. } catch (Exception inner) {
  345. e.addSuppressed(inner);
  346. }
  347. throw e;
  348. }
  349. } finally {
  350. flushLock.unlock();
  351. }
  352. return this;
  353. }
  354. @Override
  355. public void skipTranslogRecovery() {
  356. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
  357. pendingTranslogRecovery.set(false); // we are good - now we can commit
  358. }
  359. private void recoverFromTranslogInternal() throws IOException {
  360. Translog.TranslogGeneration translogGeneration = translog.getGeneration();
  361. final int opsRecovered;
  362. final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
  363. try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
  364. opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
  365. } catch (Exception e) {
  366. throw new EngineException(shardId, "failed to recover from translog", e);
  367. }
  368. // flush if we recovered something or if we have references to older translogs
  369. // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
  370. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
  371. pendingTranslogRecovery.set(false); // we are good - now we can commit
  372. if (opsRecovered > 0) {
  373. logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
  374. opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
  375. commitIndexWriter(indexWriter, translog, null);
  376. refreshLastCommittedSegmentInfos();
  377. refresh("translog_recovery");
  378. }
  379. translog.trimUnreferencedReaders();
  380. }
  381. private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
  382. final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
  383. final String translogUUID = loadTranslogUUIDFromLastCommit();
  384. // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
  385. return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
  386. }
  387. @Override
  388. Translog getTranslog() {
  389. ensureOpen();
  390. return translog;
  391. }
  392. @Override
  393. public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
  394. final boolean synced = translog.ensureSynced(locations);
  395. if (synced) {
  396. revisitIndexDeletionPolicyOnTranslogSynced();
  397. }
  398. return synced;
  399. }
  400. @Override
  401. public void syncTranslog() throws IOException {
  402. translog.sync();
  403. revisitIndexDeletionPolicyOnTranslogSynced();
  404. }
  405. private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
  406. if (combinedDeletionPolicy.hasUnreferencedCommits()) {
  407. indexWriter.deleteUnusedFiles();
  408. }
  409. }
  410. @Override
  411. public String getHistoryUUID() {
  412. return historyUUID;
  413. }
  414. /** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
  415. @Override
  416. public long getWritingBytes() {
  417. return writingBytes.get();
  418. }
  419. /**
  420. * Reads the current stored translog ID from the last commit data.
  421. */
  422. @Nullable
  423. private String loadTranslogUUIDFromLastCommit() throws IOException {
  424. final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
  425. if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
  426. throw new IllegalStateException("commit doesn't contain translog generation id");
  427. }
  428. return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
  429. }
  430. /**
  431. * Reads the current stored history ID from the IW commit data.
  432. */
  433. private String loadHistoryUUID(final IndexWriter writer) throws IOException {
  434. final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
  435. if (uuid == null) {
  436. throw new IllegalStateException("commit doesn't contain history uuid");
  437. }
  438. return uuid;
  439. }
  440. private ExternalSearcherManager createSearcherManager(SearchFactory externalSearcherFactory) throws EngineException {
  441. boolean success = false;
  442. SearcherManager internalSearcherManager = null;
  443. try {
  444. try {
  445. final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
  446. internalSearcherManager = new SearcherManager(directoryReader,
  447. new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
  448. lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
  449. ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
  450. externalSearcherFactory);
  451. success = true;
  452. return externalSearcherManager;
  453. } catch (IOException e) {
  454. maybeFailEngine("start", e);
  455. try {
  456. indexWriter.rollback();
  457. } catch (IOException inner) { // iw is closed below
  458. e.addSuppressed(inner);
  459. }
  460. throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e);
  461. }
  462. } finally {
  463. if (success == false) { // release everything we created on a failure
  464. IOUtils.closeWhileHandlingException(internalSearcherManager, indexWriter);
  465. }
  466. }
  467. }
  468. @Override
  469. public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
  470. assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
  471. try (ReleasableLock ignored = readLock.acquire()) {
  472. ensureOpen();
  473. SearcherScope scope;
  474. if (get.realtime()) {
  475. VersionValue versionValue = null;
  476. try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
  477. // we need to lock here to access the version map to do this truly in RT
  478. versionValue = getVersionFromMap(get.uid().bytes());
  479. }
  480. if (versionValue != null) {
  481. if (versionValue.isDelete()) {
  482. return GetResult.NOT_EXISTS;
  483. }
  484. if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
  485. throw new VersionConflictEngineException(shardId, get.type(), get.id(),
  486. get.versionType().explainConflictForReads(versionValue.version, get.version()));
  487. }
  488. if (get.isReadFromTranslog()) {
  489. // this is only used for updates - API _GET calls will always read form a reader for consistency
  490. // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
  491. if (versionValue.getLocation() != null) {
  492. try {
  493. Translog.Operation operation = translog.readOperation(versionValue.getLocation());
  494. if (operation != null) {
  495. // in the case of a already pruned translog generation we might get null here - yet very unlikely
  496. TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
  497. .getIndexSettings().getIndexVersionCreated());
  498. return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
  499. new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
  500. }
  501. } catch (IOException e) {
  502. maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
  503. throw new EngineException(shardId, "failed to read operation from translog", e);
  504. }
  505. } else {
  506. trackTranslogLocation.set(true);
  507. }
  508. }
  509. refresh("realtime_get", SearcherScope.INTERNAL);
  510. }
  511. scope = SearcherScope.INTERNAL;
  512. } else {
  513. // we expose what has been externally expose in a point in time snapshot via an explicit refresh
  514. scope = SearcherScope.EXTERNAL;
  515. }
  516. // no version, get the version from the index, we know that we refresh on flush
  517. return getFromSearcher(get, searcherFactory, scope);
  518. }
  519. }
  520. /**
  521. * the status of the current doc version in lucene, compared to the version in an incoming
  522. * operation
  523. */
  524. enum OpVsLuceneDocStatus {
  525. /** the op is more recent than the one that last modified the doc found in lucene*/
  526. OP_NEWER,
  527. /** the op is older or the same as the one that last modified the doc found in lucene*/
  528. OP_STALE_OR_EQUAL,
  529. /** no doc was found in lucene */
  530. LUCENE_DOC_NOT_FOUND
  531. }
  532. private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
  533. assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
  534. final OpVsLuceneDocStatus status;
  535. VersionValue versionValue = getVersionFromMap(op.uid().bytes());
  536. assert incrementVersionLookup();
  537. if (versionValue != null) {
  538. if (op.seqNo() > versionValue.seqNo ||
  539. (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
  540. status = OpVsLuceneDocStatus.OP_NEWER;
  541. else {
  542. status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
  543. }
  544. } else {
  545. // load from index
  546. assert incrementIndexVersionLookup();
  547. try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
  548. DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
  549. if (docAndSeqNo == null) {
  550. status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
  551. } else if (op.seqNo() > docAndSeqNo.seqNo) {
  552. status = OpVsLuceneDocStatus.OP_NEWER;
  553. } else if (op.seqNo() == docAndSeqNo.seqNo) {
  554. // load term to tie break
  555. final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
  556. if (op.primaryTerm() > existingTerm) {
  557. status = OpVsLuceneDocStatus.OP_NEWER;
  558. } else {
  559. status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
  560. }
  561. } else {
  562. status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
  563. }
  564. }
  565. }
  566. return status;
  567. }
  568. /** resolves the current version of the document, returning null if not found */
  569. private VersionValue resolveDocVersion(final Operation op) throws IOException {
  570. assert incrementVersionLookup(); // used for asserting in tests
  571. VersionValue versionValue = getVersionFromMap(op.uid().bytes());
  572. if (versionValue == null) {
  573. assert incrementIndexVersionLookup(); // used for asserting in tests
  574. final long currentVersion = loadCurrentVersionFromIndex(op.uid());
  575. if (currentVersion != Versions.NOT_FOUND) {
  576. versionValue = new IndexVersionValue(null, currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L);
  577. }
  578. } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() &&
  579. (engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) {
  580. versionValue = null;
  581. }
  582. return versionValue;
  583. }
  584. private VersionValue getVersionFromMap(BytesRef id) {
  585. if (versionMap.isUnsafe()) {
  586. synchronized (versionMap) {
  587. // we are switching from an unsafe map to a safe map. This might happen concurrently
  588. // but we only need to do this once since the last operation per ID is to add to the version
  589. // map so once we pass this point we can safely lookup from the version map.
  590. if (versionMap.isUnsafe()) {
  591. refresh("unsafe_version_map", SearcherScope.INTERNAL);
  592. }
  593. versionMap.enforceSafeAccess();
  594. }
  595. }
  596. return versionMap.getUnderLock(id);
  597. }
  598. private boolean canOptimizeAddDocument(Index index) {
  599. if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
  600. assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
  601. + index.getAutoGeneratedIdTimestamp();
  602. switch (index.origin()) {
  603. case PRIMARY:
  604. assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
  605. : "version: " + index.version() + " type: " + index.versionType();
  606. return true;
  607. case PEER_RECOVERY:
  608. case REPLICA:
  609. assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
  610. : "version: " + index.version() + " type: " + index.versionType();
  611. return true;
  612. case LOCAL_TRANSLOG_RECOVERY:
  613. assert index.isRetry();
  614. return true; // allow to optimize in order to update the max safe time stamp
  615. default:
  616. throw new IllegalArgumentException("unknown origin " + index.origin());
  617. }
  618. }
  619. return false;
  620. }
  621. private boolean assertVersionType(final Engine.Operation operation) {
  622. if (operation.origin() == Operation.Origin.REPLICA ||
  623. operation.origin() == Operation.Origin.PEER_RECOVERY ||
  624. operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
  625. // ensure that replica operation has expected version type for replication
  626. // ensure that versionTypeForReplicationAndRecovery is idempotent
  627. assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery()
  628. : "unexpected version type in request from [" + operation.origin().name() + "] " +
  629. "found [" + operation.versionType().name() + "] " +
  630. "expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]";
  631. }
  632. return true;
  633. }
  634. private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
  635. if (origin == Operation.Origin.PRIMARY) {
  636. assert assertOriginPrimarySequenceNumber(seqNo);
  637. } else {
  638. // sequence number should be set when operation origin is not primary
  639. assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
  640. }
  641. return true;
  642. }
  643. protected boolean assertOriginPrimarySequenceNumber(final long seqNo) {
  644. // sequence number should not be set when operation origin is primary
  645. assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
  646. : "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
  647. return true;
  648. }
  649. private long generateSeqNoForOperation(final Operation operation) {
  650. assert operation.origin() == Operation.Origin.PRIMARY;
  651. return doGenerateSeqNoForOperation(operation);
  652. }
  653. /**
  654. * Generate the sequence number for the specified operation.
  655. *
  656. * @param operation the operation
  657. * @return the sequence number
  658. */
  659. protected long doGenerateSeqNoForOperation(final Operation operation) {
  660. return localCheckpointTracker.generateSeqNo();
  661. }
  662. @Override
  663. public IndexResult index(Index index) throws IOException {
  664. assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
  665. final boolean doThrottle = index.origin().isRecovery() == false;
  666. try (ReleasableLock releasableLock = readLock.acquire()) {
  667. ensureOpen();
  668. assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
  669. assert assertVersionType(index);
  670. try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
  671. Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
  672. lastWriteNanos = index.startTime();
  673. /* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
  674. * if we have an autoGeneratedID that comes into the engine we can potentially optimize
  675. * and just use addDocument instead of updateDocument and skip the entire version and index lookupVersion across the board.
  676. * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
  677. * to detect if it has potentially been added before. We use the documents timestamp for this since it's something
  678. * that:
  679. * - doesn't change per document
  680. * - is preserved in the transaction log
  681. * - and is assigned before we start to index / replicate
  682. * NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
  683. * case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
  684. * for instance:
  685. * - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
  686. * - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
  687. *
  688. * while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
  689. * - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
  690. *
  691. * if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
  692. * documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
  693. * While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
  694. *
  695. * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
  696. * updateDocument.
  697. */
  698. final IndexingStrategy plan;
  699. if (index.origin() == Operation.Origin.PRIMARY) {
  700. plan = planIndexingAsPrimary(index);
  701. } else {
  702. // non-primary mode (i.e., replica or recovery)
  703. plan = planIndexingAsNonPrimary(index);
  704. }
  705. final IndexResult indexResult;
  706. if (plan.earlyResultOnPreFlightError.isPresent()) {
  707. indexResult = plan.earlyResultOnPreFlightError.get();
  708. assert indexResult.hasFailure();
  709. } else if (plan.indexIntoLucene) {
  710. indexResult = indexIntoLucene(index, plan);
  711. } else {
  712. indexResult = new IndexResult(
  713. plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
  714. }
  715. if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
  716. final Translog.Location location;
  717. if (indexResult.hasFailure() == false) {
  718. location = translog.add(new Translog.Index(index, indexResult));
  719. } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
  720. // if we have document failure, record it as a no-op in the translog with the generated seq_no
  721. location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
  722. } else {
  723. location = null;
  724. }
  725. indexResult.setTranslogLocation(location);
  726. }
  727. if (plan.indexIntoLucene && indexResult.hasFailure() == false) {
  728. final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
  729. versionMap.maybePutIndexUnderLock(index.uid().bytes(),
  730. new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
  731. }
  732. if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
  733. localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
  734. }
  735. indexResult.setTook(System.nanoTime() - index.startTime());
  736. indexResult.freeze();
  737. return indexResult;
  738. }
  739. } catch (RuntimeException | IOException e) {
  740. try {
  741. maybeFailEngine("index", e);
  742. } catch (Exception inner) {
  743. e.addSuppressed(inner);
  744. }
  745. throw e;
  746. }
  747. }
  748. private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
  749. final IndexingStrategy plan;
  750. final boolean appendOnlyRequest = canOptimizeAddDocument(index);
  751. if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
  752. /*
  753. * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
  754. * a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
  755. * the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
  756. * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
  757. * the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
  758. * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
  759. */
  760. assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
  761. plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
  762. } else {
  763. if (appendOnlyRequest == false) {
  764. maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
  765. assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
  766. "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
  767. }
  768. versionMap.enforceSafeAccess();
  769. // drop out of order operations
  770. assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
  771. "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
  772. // unlike the primary, replicas don't really care to about creation status of documents
  773. // this allows to ignore the case where a document was found in the live version maps in
  774. // a delete state and return false for the created flag in favor of code simplicity
  775. final OpVsLuceneDocStatus opVsLucene;
  776. if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){
  777. // the operation seq# is lower then the current local checkpoint and thus was already put into lucene
  778. // this can happen during recovery where older operations are sent from the translog that are already
  779. // part of the lucene commit (either from a peer recovery or a local translog)
  780. // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
  781. // question may have been deleted in an out of order op that is not replayed.
  782. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
  783. // See testRecoveryWithOutOfOrderDelete for an example of peer recovery
  784. opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
  785. } else {
  786. opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
  787. }
  788. if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
  789. plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
  790. } else {
  791. plan = IndexingStrategy.processNormally(
  792. opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version()
  793. );
  794. }
  795. }
  796. return plan;
  797. }
  798. private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
  799. assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
  800. final IndexingStrategy plan;
  801. // resolve an external operation into an internal one which is safe to replay
  802. if (canOptimizeAddDocument(index)) {
  803. if (mayHaveBeenIndexedBefore(index)) {
  804. plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
  805. versionMap.enforceSafeAccess();
  806. } else {
  807. plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
  808. }
  809. } else {
  810. versionMap.enforceSafeAccess();
  811. // resolves incoming version
  812. final VersionValue versionValue = resolveDocVersion(index);
  813. final long currentVersion;
  814. final boolean currentNotFoundOrDeleted;
  815. if (versionValue == null) {
  816. currentVersion = Versions.NOT_FOUND;
  817. currentNotFoundOrDeleted = true;
  818. } else {
  819. currentVersion = versionValue.version;
  820. currentNotFoundOrDeleted = versionValue.isDelete();
  821. }
  822. if (index.versionType().isVersionConflictForWrites(
  823. currentVersion, index.version(), currentNotFoundOrDeleted)) {
  824. final VersionConflictEngineException e =
  825. new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
  826. plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
  827. } else {
  828. plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
  829. generateSeqNoForOperation(index),
  830. index.versionType().updateVersion(currentVersion, index.version())
  831. );
  832. }
  833. }
  834. return plan;
  835. }
  836. private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
  837. throws IOException {
  838. assert plan.seqNoForIndexing >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
  839. assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
  840. assert plan.indexIntoLucene;
  841. /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
  842. * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
  843. * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
  844. */
  845. index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm());
  846. index.parsedDoc().version().setLongValue(plan.versionForIndexing);
  847. try {
  848. if (plan.useLuceneUpdateDocument) {
  849. updateDocs(index.uid(), index.docs(), indexWriter);
  850. } else {
  851. // document does not exists, we can optimize for create, but double check if assertions are running
  852. assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
  853. addDocs(index.docs(), indexWriter);
  854. }
  855. return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
  856. } catch (Exception ex) {
  857. if (indexWriter.getTragicException() == null) {
  858. /* There is no tragic event recorded so this must be a document failure.
  859. *
  860. * The handling inside IW doesn't guarantee that an tragic / aborting exception
  861. * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
  862. * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
  863. * we can potentially handle the exception before the engine is failed.
  864. * Bottom line is that we can only rely on the fact that if it's a document failure then
  865. * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
  866. * non-document failure
  867. *
  868. * we return a `MATCH_ANY` version to indicate no document was index. The value is
  869. * not used anyway
  870. */
  871. return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
  872. } else {
  873. throw ex;
  874. }
  875. }
  876. }
  877. /**
  878. * returns true if the indexing operation may have already be processed by this engine.
  879. * Note that it is OK to rarely return true even if this is not the case. However a `false`
  880. * return value must always be correct.
  881. *
  882. */
  883. private boolean mayHaveBeenIndexedBefore(Index index) {
  884. assert canOptimizeAddDocument(index);
  885. final boolean mayHaveBeenIndexBefore;
  886. if (index.isRetry()) {
  887. mayHaveBeenIndexBefore = true;
  888. maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(index.getAutoGeneratedIdTimestamp(), curr));
  889. assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
  890. } else {
  891. // in this case we force
  892. mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
  893. }
  894. return mayHaveBeenIndexBefore;
  895. }
  896. // for testing
  897. long getMaxSeqNoOfNonAppendOnlyOperations() {
  898. return maxSeqNoOfNonAppendOnlyOperations.get();
  899. }
  900. private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
  901. if (docs.size() > 1) {
  902. indexWriter.addDocuments(docs);
  903. } else {
  904. indexWriter.addDocument(docs.get(0));
  905. }
  906. numDocAppends.inc(docs.size());
  907. }
  908. private static final class IndexingStrategy {
  909. final boolean currentNotFoundOrDeleted;
  910. final boolean useLuceneUpdateDocument;
  911. final long seqNoForIndexing;
  912. final long versionForIndexing;
  913. final boolean indexIntoLucene;
  914. final Optional<IndexResult> earlyResultOnPreFlightError;
  915. private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument,
  916. boolean indexIntoLucene, long seqNoForIndexing,
  917. long versionForIndexing, IndexResult earlyResultOnPreFlightError) {
  918. assert useLuceneUpdateDocument == false || indexIntoLucene :
  919. "use lucene update is set to true, but we're not indexing into lucene";
  920. assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false :
  921. "can only index into lucene or have a preflight result but not both." +
  922. "indexIntoLucene: " + indexIntoLucene
  923. + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError;
  924. this.currentNotFoundOrDeleted = currentNotFoundOrDeleted;
  925. this.useLuceneUpdateDocument = useLuceneUpdateDocument;
  926. this.seqNoForIndexing = seqNoForIndexing;
  927. this.versionForIndexing = versionForIndexing;
  928. this.indexIntoLucene = indexIntoLucene;
  929. this.earlyResultOnPreFlightError =
  930. earlyResultOnPreFlightError == null ? Optional.empty() :
  931. Optional.of(earlyResultOnPreFlightError);
  932. }
  933. static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
  934. return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null);
  935. }
  936. static IndexingStrategy skipDueToVersionConflict(
  937. VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
  938. final IndexResult result = new IndexResult(e, currentVersion);
  939. return new IndexingStrategy(
  940. currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
  941. }
  942. static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
  943. long seqNoForIndexing, long versionForIndexing) {
  944. return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false,
  945. true, seqNoForIndexing, versionForIndexing, null);
  946. }
  947. static IndexingStrategy overrideExistingAsIfNotThere(
  948. long seqNoForIndexing, long versionForIndexing) {
  949. return new IndexingStrategy(true, true, true, seqNoForIndexing, versionForIndexing, null);
  950. }
  951. static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted,
  952. long seqNoForIndexing, long versionForIndexing) {
  953. return new IndexingStrategy(currentNotFoundOrDeleted, false,
  954. false, seqNoForIndexing, versionForIndexing, null);
  955. }
  956. }
  957. /**
  958. * Asserts that the doc in the index operation really doesn't exist
  959. */
  960. private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
  961. // NOTE this uses direct access to the version map since we are in the assertion code where we maintain a secondary
  962. // map in the version map such that we don't need to refresh if we are unsafe;
  963. final VersionValue versionValue = versionMap.getVersionForAssert(index.uid().bytes());
  964. if (versionValue != null) {
  965. if (versionValue.isDelete() == false || allowDeleted == false) {
  966. throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
  967. }
  968. } else {
  969. try (Searcher searcher = acquireSearcher("assert doc doesn't exist", SearcherScope.INTERNAL)) {
  970. final long docsWithId = searcher.searcher().count(new TermQuery(index.uid()));
  971. if (docsWithId > 0) {
  972. throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists [" + docsWithId + "] times in index");
  973. }
  974. }
  975. }
  976. return true;
  977. }
  978. private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
  979. if (docs.size() > 1) {
  980. indexWriter.updateDocuments(uid, docs);
  981. } else {
  982. indexWriter.updateDocument(uid, docs.get(0));
  983. }
  984. numDocUpdates.inc(docs.size());
  985. }
  986. @Override
  987. public DeleteResult delete(Delete delete) throws IOException {
  988. versionMap.enforceSafeAccess();
  989. assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
  990. assert assertVersionType(delete);
  991. assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
  992. final DeleteResult deleteResult;
  993. // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
  994. try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = versionMap.acquireLock(delete.uid().bytes())) {
  995. ensureOpen();
  996. lastWriteNanos = delete.startTime();
  997. final DeletionStrategy plan;
  998. if (delete.origin() == Operation.Origin.PRIMARY) {
  999. plan = planDeletionAsPrimary(delete);
  1000. } else {
  1001. plan = planDeletionAsNonPrimary(delete);
  1002. }
  1003. if (plan.earlyResultOnPreflightError.isPresent()) {
  1004. deleteResult = plan.earlyResultOnPreflightError.get();
  1005. } else if (plan.deleteFromLucene) {
  1006. deleteResult = deleteInLucene(delete, plan);
  1007. } else {
  1008. deleteResult = new DeleteResult(
  1009. plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
  1010. }
  1011. if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
  1012. final Translog.Location location;
  1013. if (deleteResult.hasFailure() == false) {
  1014. location = translog.add(new Translog.Delete(delete, deleteResult));
  1015. } else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
  1016. location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
  1017. delete.primaryTerm(), deleteResult.getFailure().getMessage()));
  1018. } else {
  1019. location = null;
  1020. }
  1021. deleteResult.setTranslogLocation(location);
  1022. }
  1023. if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
  1024. localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
  1025. }
  1026. deleteResult.setTook(System.nanoTime() - delete.startTime());
  1027. deleteResult.freeze();
  1028. } catch (RuntimeException | IOException e) {
  1029. try {
  1030. maybeFailEngine("index", e);
  1031. } catch (Exception inner) {
  1032. e.addSuppressed(inner);
  1033. }
  1034. throw e;
  1035. }
  1036. maybePruneDeletes();
  1037. return deleteResult;
  1038. }
  1039. private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
  1040. assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
  1041. // drop out of order operations
  1042. assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
  1043. "resolving out of order delivery based on versioning but version type isn't fit for it. got ["
  1044. + delete.versionType() + "]";
  1045. maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
  1046. assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
  1047. "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
  1048. // unlike the primary, replicas don't really care to about found status of documents
  1049. // this allows to ignore the case where a document was found in the live version maps in
  1050. // a delete state and return true for the found flag in favor of code simplicity
  1051. final OpVsLuceneDocStatus opVsLucene;
  1052. if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) {
  1053. // the operation seq# is lower then the current local checkpoint and thus was already put into lucene
  1054. // this can happen during recovery where older operations are sent from the translog that are already
  1055. // part of the lucene commit (either from a peer recovery or a local translog)
  1056. // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
  1057. // question may have been deleted in an out of order op that is not replayed.
  1058. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
  1059. // See testRecoveryWithOutOfOrderDelete for an example of peer recovery
  1060. opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
  1061. } else {
  1062. opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
  1063. }
  1064. final DeletionStrategy plan;
  1065. if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
  1066. plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version());
  1067. } else {
  1068. plan = DeletionStrategy.processNormally(
  1069. opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
  1070. delete.seqNo(), delete.version());
  1071. }
  1072. return plan;
  1073. }
  1074. private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
  1075. assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
  1076. // resolve operation from external to internal
  1077. final VersionValue versionValue = resolveDocVersion(delete);
  1078. assert incrementVersionLookup();
  1079. final long currentVersion;
  1080. final boolean currentlyDeleted;
  1081. if (versionValue == null) {
  1082. currentVersion = Versions.NOT_FOUND;
  1083. currentlyDeleted = true;
  1084. } else {
  1085. currentVersion = versionValue.version;
  1086. currentlyDeleted = versionValue.isDelete();
  1087. }
  1088. final DeletionStrategy plan;
  1089. if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
  1090. final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
  1091. plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
  1092. } else {
  1093. plan = DeletionStrategy.processNormally(
  1094. currentlyDeleted,
  1095. generateSeqNoForOperation(delete),
  1096. delete.versionType().updateVersion(currentVersion, delete.version()));
  1097. }
  1098. return plan;
  1099. }
  1100. private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
  1101. throws IOException {
  1102. try {
  1103. if (plan.currentlyDeleted == false) {
  1104. // any exception that comes from this is a either an ACE or a fatal exception there
  1105. // can't be any document failures coming from this
  1106. indexWriter.deleteDocuments(delete.uid());
  1107. numDocDeletes.inc();
  1108. }
  1109. versionMap.putDeleteUnderLock(delete.uid().bytes(),
  1110. new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
  1111. engineConfig.getThreadPool().relativeTimeInMillis()));
  1112. return new DeleteResult(
  1113. plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
  1114. } catch (Exception ex) {
  1115. if (indexWriter.getTragicException() == null) {
  1116. // there is no tragic event and such it must be a document level failure
  1117. return new DeleteResult(
  1118. ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
  1119. } else {
  1120. throw ex;
  1121. }
  1122. }
  1123. }
  1124. private static final class DeletionStrategy {
  1125. // of a rare double delete
  1126. final boolean deleteFromLucene;
  1127. final boolean currentlyDeleted;
  1128. final long seqNoOfDeletion;
  1129. final long versionOfDeletion;
  1130. final Optional<DeleteResult> earlyResultOnPreflightError;
  1131. private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted,
  1132. long seqNoOfDeletion, long versionOfDeletion,
  1133. DeleteResult earlyResultOnPreflightError) {
  1134. assert (deleteFromLucene && earlyResultOnPreflightError != null) == false :
  1135. "can only delete from lucene or have a preflight result but not both." +
  1136. "deleteFromLucene: " + deleteFromLucene
  1137. + " earlyResultOnPreFlightError:" + earlyResultOnPreflightError;
  1138. this.deleteFromLucene = deleteFromLucene;
  1139. this.currentlyDeleted = currentlyDeleted;
  1140. this.seqNoOfDeletion = seqNoOfDeletion;
  1141. this.versionOfDeletion = versionOfDeletion;
  1142. this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ?
  1143. Optional.empty() : Optional.of(earlyResultOnPreflightError);
  1144. }
  1145. static DeletionStrategy skipDueToVersionConflict(
  1146. VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
  1147. final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
  1148. final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
  1149. return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
  1150. }
  1151. static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) {
  1152. return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
  1153. }
  1154. public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) {
  1155. return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
  1156. }
  1157. }
  1158. @Override
  1159. public void maybePruneDeletes() {
  1160. // It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
  1161. // every 1/4 of gcDeletesInMillis:
  1162. if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
  1163. pruneDeletedTombstones();
  1164. }
  1165. }
  1166. @Override
  1167. public NoOpResult noOp(final NoOp noOp) {
  1168. NoOpResult noOpResult;
  1169. try (ReleasableLock ignored = readLock.acquire()) {
  1170. noOpResult = innerNoOp(noOp);
  1171. } catch (final Exception e) {
  1172. noOpResult = new NoOpResult(noOp.seqNo(), e);
  1173. }
  1174. return noOpResult;
  1175. }
  1176. private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
  1177. assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
  1178. assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
  1179. final long seqNo = noOp.seqNo();
  1180. try {
  1181. final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
  1182. if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
  1183. final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
  1184. noOpResult.setTranslogLocation(location);
  1185. }
  1186. noOpResult.setTook(System.nanoTime() - noOp.startTime());
  1187. noOpResult.freeze();
  1188. return noOpResult;
  1189. } finally {
  1190. if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
  1191. localCheckpointTracker.markSeqNoAsCompleted(seqNo);
  1192. }
  1193. }
  1194. }
  1195. @Override
  1196. public void refresh(String source) throws EngineException {
  1197. refresh(source, SearcherScope.EXTERNAL);
  1198. }
  1199. final void refresh(String source, SearcherScope scope) throws EngineException {
  1200. // we obtain a read lock here, since we don't want a flush to happen while we are refreshing
  1201. // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
  1202. // both refresh types will result in an internal refresh but only the external will also
  1203. // pass the new reader reference to the external reader manager.
  1204. // this will also cause version map ram to be freed hence we always account for it.
  1205. final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
  1206. writingBytes.addAndGet(bytes);
  1207. try (ReleasableLock lock = readLock.acquire()) {
  1208. ensureOpen();
  1209. if (store.tryIncRef()) {
  1210. // increment the ref just to ensure nobody closes the store during a refresh
  1211. try {
  1212. switch (scope) {
  1213. case EXTERNAL:
  1214. // even though we maintain 2 managers we really do the heavy-lifting only once.
  1215. // the second refresh will only do the extra work we have to do for warming caches etc.
  1216. externalSearcherManager.maybeRefreshBlocking();
  1217. // the break here is intentional we never refresh both internal / external together
  1218. break;
  1219. case INTERNAL:
  1220. internalSearcherManager.maybeRefreshBlocking();
  1221. break;
  1222. default:
  1223. throw new IllegalArgumentException("unknown scope: " + scope);
  1224. }
  1225. } finally {
  1226. store.decRef();
  1227. }
  1228. }
  1229. } catch (AlreadyClosedException e) {
  1230. failOnTragicEvent(e);
  1231. throw e;
  1232. } catch (Exception e) {
  1233. try {
  1234. failEngine("refresh failed source[" + source + "]", e);
  1235. } catch (Exception inner) {
  1236. e.addSuppressed(inner);
  1237. }
  1238. throw new RefreshFailedEngineException(shardId, e);
  1239. } finally {
  1240. writingBytes.addAndGet(-bytes);
  1241. }
  1242. // TODO: maybe we should just put a scheduled job in threadPool?
  1243. // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
  1244. // for a long time:
  1245. maybePruneDeletes();
  1246. mergeScheduler.refreshConfig();
  1247. }
  1248. @Override
  1249. public void writeIndexingBuffer() throws EngineException {
  1250. // we obtain a read lock here, since we don't want a flush to happen while we are writing
  1251. // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
  1252. refresh("write indexing buffer", SearcherScope.INTERNAL);
  1253. }
  1254. @Override
  1255. public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
  1256. // best effort attempt before we acquire locks
  1257. ensureOpen();
  1258. if (indexWriter.hasUncommittedChanges()) {
  1259. logger.trace("can't sync commit [{}]. have pending changes", syncId);
  1260. return SyncedFlushResult.PENDING_OPERATIONS;
  1261. }
  1262. if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
  1263. logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
  1264. return SyncedFlushResult.COMMIT_MISMATCH;
  1265. }
  1266. try (ReleasableLock lock = writeLock.acquire()) {
  1267. ensureOpen();
  1268. ensureCanFlush();
  1269. // lets do a refresh to make sure we shrink the version map. This refresh will be either a no-op (just shrink the version map)
  1270. // or we also have uncommitted changes and that causes this syncFlush to fail.
  1271. refresh("sync_flush", SearcherScope.INTERNAL);
  1272. if (indexWriter.hasUncommittedChanges()) {
  1273. logger.trace("can't sync commit [{}]. have pending changes", syncId);
  1274. return SyncedFlushResult.PENDING_OPERATIONS;
  1275. }
  1276. if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
  1277. logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
  1278. return SyncedFlushResult.COMMIT_MISMATCH;
  1279. }
  1280. logger.trace("starting sync commit [{}]", syncId);
  1281. commitIndexWriter(indexWriter, translog, syncId);
  1282. logger.debug("successfully sync committed. sync id [{}].", syncId);
  1283. lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
  1284. return SyncedFlushResult.SUCCESS;
  1285. } catch (IOException ex) {
  1286. maybeFailEngine("sync commit", ex);
  1287. throw new EngineException(shardId, "failed to sync commit", ex);
  1288. }
  1289. }
  1290. final boolean tryRenewSyncCommit() {
  1291. boolean renewed = false;
  1292. try (ReleasableLock lock = writeLock.acquire()) {
  1293. ensureOpen();
  1294. ensureCanFlush();
  1295. String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
  1296. long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
  1297. if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
  1298. logger.trace("start renewing sync commit [{}]", syncId);
  1299. commitIndexWriter(indexWriter, translog, syncId);
  1300. logger.debug("successfully sync committed. sync id [{}].", syncId);
  1301. lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
  1302. renewed = true;
  1303. }
  1304. } catch (IOException ex) {
  1305. maybeFailEngine("renew sync commit", ex);
  1306. throw new EngineException(shardId, "failed to renew sync commit", ex);
  1307. }
  1308. if (renewed) {
  1309. // refresh outside of the write lock
  1310. // we have to refresh internal searcher here to ensure we release unreferenced segments.
  1311. refresh("renew sync commit", SearcherScope.INTERNAL);
  1312. }
  1313. return renewed;
  1314. }
  1315. @Override
  1316. public boolean shouldPeriodicallyFlush() {
  1317. ensureOpen();
  1318. final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
  1319. final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
  1320. if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
  1321. return false;
  1322. }
  1323. /*
  1324. * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be
  1325. * below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the
  1326. * periodically flush condition if this condition is disabled after a flush. The condition will change if the new
  1327. * commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1].
  1328. *
  1329. * When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of
  1330. * the new commit, we know that the last generation must contain operations because its size is above the flush
  1331. * threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation.
  1332. * This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only
  1333. * happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled
  1334. * generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied.
  1335. *
  1336. * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
  1337. */
  1338. final long translogGenerationOfNewCommit =
  1339. translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration;
  1340. return translogGenerationOfLastCommit < translogGenerationOfNewCommit
  1341. || localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo();
  1342. }
  1343. @Override
  1344. public CommitId flush() throws EngineException {
  1345. return flush(false, false);
  1346. }
  1347. @Override
  1348. public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
  1349. ensureOpen();
  1350. final byte[] newCommitId;
  1351. /*
  1352. * Unfortunately the lock order is important here. We have to acquire the readlock first otherwise
  1353. * if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
  1354. * Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock
  1355. * Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 1
  1356. */
  1357. try (ReleasableLock lock = readLock.acquire()) {
  1358. ensureOpen();
  1359. if (flushLock.tryLock() == false) {
  1360. // if we can't get the lock right away we block if needed otherwise barf
  1361. if (waitIfOngoing) {
  1362. logger.trace("waiting for in-flight flush to finish");
  1363. flushLock.lock();
  1364. logger.trace("acquired flush lock after blocking");
  1365. } else {
  1366. return new CommitId(lastCommittedSegmentInfos.getId());
  1367. }
  1368. } else {
  1369. logger.trace("acquired flush lock immediately");
  1370. }
  1371. try {
  1372. // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
  1373. // newly created commit points to a different translog generation (can free translog)
  1374. if (indexWriter.hasUncommittedChanges() || force || shouldPeriodicallyFlush()) {
  1375. ensureCanFlush();
  1376. try {
  1377. translog.rollGeneration();
  1378. logger.trace("starting commit for flush; commitTranslog=true");
  1379. commitIndexWriter(indexWriter, translog, null);
  1380. logger.trace("finished commit for flush");
  1381. // we need to refresh in order to clear older version values
  1382. refresh("version_table_flush", SearcherScope.INTERNAL);
  1383. translog.trimUnreferencedReaders();
  1384. } catch (AlreadyClosedException e) {
  1385. throw e;
  1386. } catch (Exception e) {
  1387. throw new FlushFailedEngineException(shardId, e);
  1388. }
  1389. refreshLastCommittedSegmentInfos();
  1390. }
  1391. newCommitId = lastCommittedSegmentInfos.getId();
  1392. } catch (FlushFailedEngineException ex) {
  1393. maybeFailEngine("flush", ex);
  1394. throw ex;
  1395. } finally {
  1396. flushLock.unlock();
  1397. }
  1398. }
  1399. // We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
  1400. // (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
  1401. if (engineConfig.isEnableGcDeletes()) {
  1402. pruneDeletedTombstones();
  1403. }
  1404. return new CommitId(newCommitId);
  1405. }
  1406. private void refreshLastCommittedSegmentInfos() {
  1407. /*
  1408. * we have to inc-ref the store here since if the engine is closed by a tragic event
  1409. * we don't acquire the write lock and wait until we have exclusive access. This might also
  1410. * dec the store reference which can essentially close the store and unless we can inc the reference
  1411. * we can't use it.
  1412. */
  1413. store.incRef();
  1414. try {
  1415. // reread the last committed segment infos
  1416. lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
  1417. } catch (Exception e) {
  1418. if (isClosed.get() == false) {
  1419. try {
  1420. logger.warn("failed to read latest segment infos on flush", e);
  1421. } catch (Exception inner) {
  1422. e.addSuppressed(inner);
  1423. }
  1424. if (Lucene.isCorruptionException(e)) {
  1425. throw new FlushFailedEngineException(shardId, e);
  1426. }
  1427. }
  1428. } finally {
  1429. store.decRef();
  1430. }
  1431. }
  1432. @Override
  1433. public void rollTranslogGeneration() throws EngineException {
  1434. try (ReleasableLock ignored = readLock.acquire()) {
  1435. ensureOpen();
  1436. translog.rollGeneration();
  1437. translog.trimUnreferencedReaders();
  1438. } catch (AlreadyClosedException e) {
  1439. failOnTragicEvent(e);
  1440. throw e;
  1441. } catch (Exception e) {
  1442. try {
  1443. failEngine("translog trimming failed", e);
  1444. } catch (Exception inner) {
  1445. e.addSuppressed(inner);
  1446. }
  1447. throw new EngineException(shardId, "failed to roll translog", e);
  1448. }
  1449. }
  1450. @Override
  1451. public void trimTranslog() throws EngineException {
  1452. try (ReleasableLock lock = readLock.acquire()) {
  1453. ensureOpen();
  1454. translog.trimUnreferencedReaders();
  1455. } catch (AlreadyClosedException e) {
  1456. failOnTragicEvent(e);
  1457. throw e;
  1458. } catch (Exception e) {
  1459. try {
  1460. failEngine("translog trimming failed", e);
  1461. } catch (Exception inner) {
  1462. e.addSuppressed(inner);
  1463. }
  1464. throw new EngineException(shardId, "failed to trim translog", e);
  1465. }
  1466. }
  1467. private void pruneDeletedTombstones() {
  1468. /*
  1469. * We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
  1470. * are remembered for at least one GC delete cycle and trimmed periodically. This is, at the moment, the best we can do on
  1471. * primary for user facing APIs but this arbitrary time limit is problematic for replicas. On replicas however we should
  1472. * trim only deletes whose seqno at most the local checkpoint. This requirement is explained as follows.
  1473. *
  1474. * Suppose o1 and o2 are two operations on the same document with seq#(o1) < seq#(o2), and o2 arrives before o1 on the replica.
  1475. * o2 is processed normally since it arrives first; when o1 arrives it should be discarded:
  1476. * - If seq#(o1) <= LCP, then it will be not be added to Lucene, as it was already previously added.
  1477. * - If seq#(o1) > LCP, then it depends on the nature of o2:
  1478. * *) If o2 is a delete then its seq# is recorded in the VersionMap, since seq#(o2) > seq#(o1) > LCP,
  1479. * so a lookup can find it and determine that o1 is stale.
  1480. * *) If o2 is an indexing then its seq# is either in Lucene (if refreshed) or the VersionMap (if not refreshed yet),
  1481. * so a real-time lookup can find it and determine that o1 is stale.
  1482. *
  1483. * Here we prefer to deploy a single trimming strategy, which satisfies two constraints, on both primary and replicas because:
  1484. * - It's simpler - no need to distinguish if an engine is running at primary mode or replica mode or being promoted.
  1485. * - If a replica subsequently is promoted, user experience is maintained as that replica remembers deletes for the last GC cycle.
  1486. *
  1487. * However, the version map may consume less memory if we deploy two different trimming strategies for primary and replicas.
  1488. */
  1489. final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
  1490. final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis();
  1491. versionMap.pruneTombstones(maxTimestampToPrune, localCheckpointTracker.getCheckpoint());
  1492. lastDeleteVersionPruneTimeMSec = timeMSec;
  1493. }
  1494. // testing
  1495. void clearDeletedTombstones() {
  1496. versionMap.pruneTombstones(Long.MAX_VALUE, localCheckpointTracker.getMaxSeqNo());
  1497. }
  1498. // for testing
  1499. final Collection<DeleteVersionValue> getDeletedTombstones() {
  1500. return versionMap.getAllTombstones().values();
  1501. }
  1502. @Override
  1503. public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
  1504. final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
  1505. /*
  1506. * We do NOT acquire the readlock here since we are waiting on the merges to finish
  1507. * that's fine since the IW.rollback should stop all the threads and trigger an IOException
  1508. * causing us to fail the forceMerge
  1509. *
  1510. * The way we implement upgrades is a bit hackish in the sense that we set an instance
  1511. * variable and that this setting will thus apply to the next forced merge that will be run.
  1512. * This is ok because (1) this is the only place we call forceMerge, (2) we have a single
  1513. * thread for optimize, and the 'optimizeLock' guarding this code, and (3) ConcurrentMergeScheduler
  1514. * syncs calls to findForcedMerges.
  1515. */
  1516. assert indexWriter.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy : "MergePolicy is " + indexWriter.getConfig().getMergePolicy().getClass().getName();
  1517. ElasticsearchMergePolicy mp = (ElasticsearchMergePolicy) indexWriter.getConfig().getMergePolicy();
  1518. optimizeLock.lock();
  1519. try {
  1520. ensureOpen();
  1521. if (upgrade) {
  1522. logger.info("starting segment upgrade upgradeOnlyAncientSegments={}", upgradeOnlyAncientSegments);
  1523. mp.setUpgradeInProgress(true, upgradeOnlyAncientSegments);
  1524. }
  1525. store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
  1526. try {
  1527. if (onlyExpungeDeletes) {
  1528. assert upgrade == false;
  1529. indexWriter.forceMergeDeletes(true /* blocks and waits for merges*/);
  1530. } else if (maxNumSegments <= 0) {
  1531. assert upgrade == false;
  1532. indexWriter.maybeMerge();
  1533. } else {
  1534. indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
  1535. }
  1536. if (flush) {
  1537. if (tryRenewSyncCommit() == false) {
  1538. flush(false, true);
  1539. }
  1540. }
  1541. if (upgrade) {
  1542. logger.info("finished segment upgrade");
  1543. }
  1544. } finally {
  1545. store.decRef();
  1546. }
  1547. } catch (AlreadyClosedException ex) {
  1548. /* in this case we first check if the engine is still open. If so this exception is just fine
  1549. * and expected. We don't hold any locks while we block on forceMerge otherwise it would block
  1550. * closing the engine as well. If we are not closed we pass it on to failOnTragicEvent which ensures
  1551. * we are handling a tragic even exception here */
  1552. ensureOpen(ex);
  1553. failOnTragicEvent(ex);
  1554. throw ex;
  1555. } catch (Exception e) {
  1556. try {
  1557. maybeFailEngine("force merge", e);
  1558. } catch (Exception inner) {
  1559. e.addSuppressed(inner);
  1560. }
  1561. throw e;
  1562. } finally {
  1563. try {
  1564. mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error
  1565. } finally {
  1566. optimizeLock.unlock();
  1567. }
  1568. }
  1569. }
  1570. @Override
  1571. public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
  1572. // we have to flush outside of the readlock otherwise we might have a problem upgrading
  1573. // the to a write lock when we fail the engine in this operation
  1574. if (flushFirst) {
  1575. logger.trace("start flush for snapshot");
  1576. flush(false, true);
  1577. logger.trace("finish flush for snapshot");
  1578. }
  1579. final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
  1580. return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
  1581. }
  1582. @Override
  1583. public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
  1584. final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
  1585. return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
  1586. }
  1587. private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
  1588. // Revisit the deletion policy if we can clean up the snapshotting commit.
  1589. if (combinedDeletionPolicy.releaseCommit(snapshot)) {
  1590. ensureOpen();
  1591. indexWriter.deleteUnusedFiles();
  1592. }
  1593. }
  1594. private boolean failOnTragicEvent(AlreadyClosedException ex) {
  1595. final boolean engineFailed;
  1596. // if we are already closed due to some tragic exception
  1597. // we need to fail the engine. it might have already been failed before
  1598. // but we are double-checking it's failed and closed
  1599. if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
  1600. final Exception tragicException;
  1601. if (indexWriter.getTragicException() instanceof Exception) {
  1602. tragicException = (Exception) indexWriter.getTragicException();
  1603. } else {
  1604. tragicException = new RuntimeException(indexWriter.getTragicException());
  1605. }
  1606. failEngine("already closed by tragic event on the index writer", tragicException);
  1607. engineFailed = true;
  1608. } else if (translog.isOpen() == false && translog.getTragicException() != null) {
  1609. failEngine("already closed by tragic event on the translog", translog.getTragicException());
  1610. engineFailed = true;
  1611. } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
  1612. // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
  1613. // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
  1614. throw new AssertionError("Unexpected AlreadyClosedException", ex);
  1615. } else {
  1616. engineFailed = false;
  1617. }
  1618. return engineFailed;
  1619. }
  1620. @Override
  1621. protected boolean maybeFailEngine(String source, Exception e) {
  1622. boolean shouldFail = super.maybeFailEngine(source, e);
  1623. if (shouldFail) {
  1624. return true;
  1625. }
  1626. // Check for AlreadyClosedException -- ACE is a very special
  1627. // exception that should only be thrown in a tragic event. we pass on the checks to failOnTragicEvent which will
  1628. // throw and AssertionError if the tragic event condition is not met.
  1629. if (e instanceof AlreadyClosedException) {
  1630. return failOnTragicEvent((AlreadyClosedException)e);
  1631. } else if (e != null &&
  1632. ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
  1633. || (translog.isOpen() == false && translog.getTragicException() == e))) {
  1634. // this spot on - we are handling the tragic event exception here so we have to fail the engine
  1635. // right away
  1636. failEngine(source, e);
  1637. return true;
  1638. }
  1639. return false;
  1640. }
  1641. @Override
  1642. protected SegmentInfos getLastCommittedSegmentInfos() {
  1643. return lastCommittedSegmentInfos;
  1644. }
  1645. @Override
  1646. protected final void writerSegmentStats(SegmentsStats stats) {
  1647. stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
  1648. stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed());
  1649. stats.updateMaxUnsafeAutoIdTimestamp(maxUnsafeAutoIdTimestamp.get());
  1650. }
  1651. @Override
  1652. public long getIndexBufferRAMBytesUsed() {
  1653. // We don't guard w/ readLock here, so we could throw AlreadyClosedException
  1654. return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
  1655. }
  1656. @Override
  1657. public List<Segment> segments(boolean verbose) {
  1658. try (ReleasableLock lock = readLock.acquire()) {
  1659. Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose);
  1660. // fill in the merges flag
  1661. Set<OnGoingMerge> onGoingMerges = mergeScheduler.onGoingMerges();
  1662. for (OnGoingMerge onGoingMerge : onGoingMerges) {
  1663. for (SegmentCommitInfo segmentInfoPerCommit : onGoingMerge.getMergedSegments()) {
  1664. for (Segment segment : segmentsArr) {
  1665. if (segment.getName().equals(segmentInfoPerCommit.info.name)) {
  1666. segment.mergeId = onGoingMerge.getId();
  1667. break;
  1668. }
  1669. }
  1670. }
  1671. }
  1672. return Arrays.asList(segmentsArr);
  1673. }
  1674. }
  1675. /**
  1676. * Closes the engine without acquiring the write lock. This should only be
  1677. * called while the write lock is hold or in a disaster condition ie. if the engine
  1678. * is failed.
  1679. */
  1680. @Override
  1681. protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
  1682. if (isClosed.compareAndSet(false, true)) {
  1683. assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
  1684. try {
  1685. this.versionMap.clear();
  1686. if (internalSearcherManager != null) {
  1687. internalSearcherManager.removeListener(versionMap);
  1688. }
  1689. try {
  1690. IOUtils.close(externalSearcherManager, internalSearcherManager);
  1691. } catch (Exception e) {
  1692. logger.warn("Failed to close SearcherManager", e);
  1693. }
  1694. try {
  1695. IOUtils.close(translog);
  1696. } catch (Exception e) {
  1697. logger.warn("Failed to close translog", e);
  1698. }
  1699. // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
  1700. logger.trace("rollback indexWriter");
  1701. try {
  1702. indexWriter.rollback();
  1703. } catch (AlreadyClosedException ex) {
  1704. failOnTragicEvent(ex);
  1705. throw ex;
  1706. }
  1707. logger.trace("rollback indexWriter done");
  1708. } catch (Exception e) {
  1709. logger.warn("failed to rollback writer on close", e);
  1710. } finally {
  1711. try {
  1712. store.decRef();
  1713. logger.debug("engine closed [{}]", reason);
  1714. } finally {
  1715. closedLatch.countDown();
  1716. }
  1717. }
  1718. }
  1719. }
  1720. @Override
  1721. public Searcher acquireSearcher(String source, SearcherScope scope) {
  1722. /* Acquire order here is store -> manager since we need
  1723. * to make sure that the store is not closed before
  1724. * the searcher is acquired. */
  1725. store.incRef();
  1726. Releasable releasable = store::decRef;
  1727. try {
  1728. final ReferenceManager<IndexSearcher> referenceManager;
  1729. switch (scope) {
  1730. case INTERNAL:
  1731. referenceManager = internalSearcherManager;
  1732. break;
  1733. case EXTERNAL:
  1734. referenceManager = externalSearcherManager;
  1735. break;
  1736. default:
  1737. throw new IllegalStateException("unknown scope: " + scope);
  1738. }
  1739. EngineSearcher engineSearcher = new EngineSearcher(source, referenceManager, store, logger);
  1740. releasable = null; // success - hand over the reference to the engine searcher
  1741. return engineSearcher;
  1742. } catch (AlreadyClosedException ex) {
  1743. throw ex;
  1744. } catch (Exception ex) {
  1745. ensureOpen(ex); // throw EngineCloseException here if we are already closed
  1746. logger.error(() -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex);
  1747. throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
  1748. } finally {
  1749. Releasables.close(releasable);
  1750. }
  1751. }
  1752. private long loadCurrentVersionFromIndex(Term uid) throws IOException {
  1753. assert incrementIndexVersionLookup();
  1754. try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
  1755. return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid);
  1756. }
  1757. }
  1758. private IndexWriter createWriter() throws IOException {
  1759. try {
  1760. final IndexWriterConfig iwc = getIndexWriterConfig();
  1761. return createWriter(store.directory(), iwc);
  1762. } catch (LockObtainFailedException ex) {
  1763. logger.warn("could not lock IndexWriter", ex);
  1764. throw ex;
  1765. }
  1766. }
  1767. // pkg-private for testing
  1768. IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
  1769. return new IndexWriter(directory, iwc);
  1770. }
  1771. private IndexWriterConfig getIndexWriterConfig() {
  1772. final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
  1773. iwc.setCommitOnClose(false); // we by default don't commit on close
  1774. iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
  1775. iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
  1776. // with tests.verbose, lucene sets this up: plumb to align with filesystem stream
  1777. boolean verbose = false;
  1778. try {
  1779. verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
  1780. } catch (Exception ignore) {
  1781. }
  1782. iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
  1783. iwc.setMergeScheduler(mergeScheduler);
  1784. MergePolicy mergePolicy = config().getMergePolicy();
  1785. // Give us the opportunity to upgrade old segments while performing
  1786. // background merges
  1787. mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
  1788. iwc.setMergePolicy(mergePolicy);
  1789. iwc.setSimilarity(engineConfig.getSimilarity());
  1790. iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
  1791. iwc.setCodec(engineConfig.getCodec());
  1792. iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
  1793. if (config().getIndexSort() != null) {
  1794. iwc.setIndexSort(config().getIndexSort());
  1795. }
  1796. return iwc;
  1797. }
  1798. /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
  1799. static final class SearchFactory extends EngineSearcherFactory {
  1800. private final Engine.Warmer warmer;
  1801. private final Logger logger;
  1802. private final AtomicBoolean isEngineClosed;
  1803. SearchFactory(Logger logger, AtomicBoolean isEngineClosed, EngineConfig engineConfig) {
  1804. super(engineConfig);
  1805. warmer = engineConfig.getWarmer();
  1806. this.logger = logger;
  1807. this.isEngineClosed = isEngineClosed;
  1808. }
  1809. @Override
  1810. public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
  1811. IndexSearcher searcher = super.newSearcher(reader, previousReader);
  1812. if (reader instanceof LeafReader && isMergedSegment((LeafReader) reader)) {
  1813. // we call newSearcher from the IndexReaderWarmer which warms segments during merging
  1814. // in that case the reader is a LeafReader and all we need to do is to build a new Searcher
  1815. // and return it since it does it's own warming for that particular reader.
  1816. return searcher;
  1817. }
  1818. if (warmer != null) {
  1819. try {
  1820. assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
  1821. warmer.warm(new Searcher("top_reader_warming", searcher));
  1822. } catch (Exception e) {
  1823. if (isEngineClosed.get() == false) {
  1824. logger.warn("failed to prepare/warm", e);
  1825. }
  1826. }
  1827. }
  1828. return searcher;
  1829. }
  1830. }
  1831. @Override
  1832. public void activateThrottling() {
  1833. int count = throttleRequestCount.incrementAndGet();
  1834. assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
  1835. if (count == 1) {
  1836. throttle.activate();
  1837. }
  1838. }
  1839. @Override
  1840. public void deactivateThrottling() {
  1841. int count = throttleRequestCount.decrementAndGet();
  1842. assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
  1843. if (count == 0) {
  1844. throttle.deactivate();
  1845. }
  1846. }
  1847. @Override
  1848. public boolean isThrottled() {
  1849. return throttle.isThrottled();
  1850. }
  1851. @Override
  1852. public long getIndexThrottleTimeInMillis() {
  1853. return throttle.getThrottleTimeInMillis();
  1854. }
  1855. long getGcDeletesInMillis() {
  1856. return engineConfig.getIndexSettings().getGcDeletesInMillis();
  1857. }
  1858. LiveIndexWriterConfig getCurrentIndexWriterConfig() {
  1859. return indexWriter.getConfig();
  1860. }
  1861. private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler {
  1862. private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
  1863. private final AtomicBoolean isThrottling = new AtomicBoolean();
  1864. EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
  1865. super(shardId, indexSettings);
  1866. }
  1867. @Override
  1868. public synchronized void beforeMerge(OnGoingMerge merge) {
  1869. int maxNumMerges = mergeScheduler.getMaxMergeCount();
  1870. if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
  1871. if (isThrottling.getAndSet(true) == false) {
  1872. logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
  1873. activateThrottling();
  1874. }
  1875. }
  1876. }
  1877. @Override
  1878. public synchronized void afterMerge(OnGoingMerge merge) {
  1879. int maxNumMerges = mergeScheduler.getMaxMergeCount();
  1880. if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
  1881. if (isThrottling.getAndSet(false)) {
  1882. logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
  1883. deactivateThrottling();
  1884. }
  1885. }
  1886. if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
  1887. // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer
  1888. // we deadlock on engine#close for instance.
  1889. engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
  1890. @Override
  1891. public void onFailure(Exception e) {
  1892. if (isClosed.get() == false) {
  1893. logger.warn("failed to flush after merge has finished");
  1894. }
  1895. }
  1896. @Override
  1897. protected void doRun() throws Exception {
  1898. // if we have no pending merges and we are supposed to flush once merges have finished
  1899. // we try to renew a sync commit which is the case when we are having a big merge after we
  1900. // are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
  1901. // if we either have records in the translog or if we don't have a sync ID at all...
  1902. // maybe even more important, we flush after all merges finish and we are inactive indexing-wise to
  1903. // free up transient disk usage of the (presumably biggish) segments that were just merged
  1904. if (tryRenewSyncCommit() == false) {
  1905. flush();
  1906. }
  1907. }
  1908. });
  1909. }
  1910. }
  1911. @Override
  1912. protected void handleMergeException(final Directory dir, final Throwable exc) {
  1913. engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
  1914. @Override
  1915. public void onFailure(Exception e) {
  1916. logger.debug("merge failure action rejected", e);
  1917. }
  1918. @Override
  1919. protected void doRun() throws Exception {
  1920. /*
  1921. * We do this on another thread rather than the merge thread that we are initially called on so that we have complete
  1922. * confidence that the call stack does not contain catch statements that would cause the error that might be thrown
  1923. * here from being caught and never reaching the uncaught exception handler.
  1924. */
  1925. failEngine("merge failed", new MergePolicy.MergeException(exc, dir));
  1926. }
  1927. });
  1928. }
  1929. }
  1930. /**
  1931. * Commits the specified index writer.
  1932. *
  1933. * @param writer the index writer to commit
  1934. * @param translog the translog
  1935. * @param syncId the sync flush ID ({@code null} if not committing a synced flush)
  1936. * @throws IOException if an I/O exception occurs committing the specfied writer
  1937. */
  1938. protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
  1939. ensureCanFlush();
  1940. try {
  1941. final long localCheckpoint = localCheckpointTracker.getCheckpoint();
  1942. final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
  1943. final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
  1944. final String translogUUID = translogGeneration.translogUUID;
  1945. final String localCheckpointValue = Long.toString(localCheckpoint);
  1946. writer.setLiveCommitData(() -> {
  1947. /*
  1948. * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
  1949. * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
  1950. * the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
  1951. * risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
  1952. * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
  1953. * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
  1954. * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
  1955. */
  1956. final Map<String, String> commitData = new HashMap<>(6);
  1957. commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
  1958. commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
  1959. commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
  1960. if (syncId != null) {
  1961. commitData.put(Engine.SYNC_COMMIT_ID, syncId);
  1962. }
  1963. commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
  1964. commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
  1965. commitData.put(HISTORY_UUID_KEY, historyUUID);
  1966. logger.trace("committing writer with commit data [{}]", commitData);
  1967. return commitData.entrySet().iterator();
  1968. });
  1969. writer.commit();
  1970. } catch (final Exception ex) {
  1971. try {
  1972. failEngine("lucene commit failed", ex);
  1973. } catch (final Exception inner) {
  1974. ex.addSuppressed(inner);
  1975. }
  1976. throw ex;
  1977. } catch (final AssertionError e) {
  1978. /*
  1979. * If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
  1980. * throw FileNotFoundException or NoSuchFileException can also hit this.
  1981. */
  1982. if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
  1983. final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
  1984. try {
  1985. failEngine("lucene commit failed", engineException);
  1986. } catch (final Exception inner) {
  1987. engineException.addSuppressed(inner);
  1988. }
  1989. throw engineException;
  1990. } else {
  1991. throw e;
  1992. }
  1993. }
  1994. }
  1995. private void ensureCanFlush() {
  1996. // translog recover happens after the engine is fully constructed
  1997. // if we are in this stage we have to prevent flushes from this
  1998. // engine otherwise we might loose documents if the flush succeeds
  1999. // and the translog recover fails we we "commit" the translog on flush.
  2000. if (pendingTranslogRecovery.get()) {
  2001. throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery");
  2002. }
  2003. }
  2004. public void onSettingsChanged() {
  2005. mergeScheduler.refreshConfig();
  2006. // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
  2007. maybePruneDeletes();
  2008. if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
  2009. // this is an anti-viral settings you can only opt out for the entire index
  2010. // only if a shard starts up again due to relocation or if the index is closed
  2011. // the setting will be re-interpreted if it's set to true
  2012. this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
  2013. }
  2014. final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
  2015. final IndexSettings indexSettings = engineConfig.getIndexSettings();
  2016. translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
  2017. translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
  2018. }
  2019. public MergeStats getMergeStats() {
  2020. return mergeScheduler.stats();
  2021. }
  2022. public final LocalCheckpointTracker getLocalCheckpointTracker() {
  2023. return localCheckpointTracker;
  2024. }
  2025. /**
  2026. * Returns the number of times a version was looked up either from the index.
  2027. * Note this is only available if assertions are enabled
  2028. */
  2029. long getNumIndexVersionsLookups() { // for testing
  2030. return numIndexVersionsLookups.count();
  2031. }
  2032. /**
  2033. * Returns the number of times a version was looked up either from memory or from the index.
  2034. * Note this is only available if assertions are enabled
  2035. */
  2036. long getNumVersionLookups() { // for testing
  2037. return numVersionLookups.count();
  2038. }
  2039. private boolean incrementVersionLookup() { // only used by asserts
  2040. numVersionLookups.inc();
  2041. return true;
  2042. }
  2043. private boolean incrementIndexVersionLookup() {
  2044. numIndexVersionsLookups.inc();
  2045. return true;
  2046. }
  2047. int getVersionMapSize() {
  2048. return versionMap.getAllCurrent().size();
  2049. }
  2050. boolean isSafeAccessRequired() {
  2051. return versionMap.isSafeAccessRequired();
  2052. }
  2053. /**
  2054. * Returns the number of documents have been deleted since this engine was opened.
  2055. * This count does not include the deletions from the existing segments before opening engine.
  2056. */
  2057. long getNumDocDeletes() {
  2058. return numDocDeletes.count();
  2059. }
  2060. /**
  2061. * Returns the number of documents have been appended since this engine was opened.
  2062. * This count does not include the appends from the existing segments before opening engine.
  2063. */
  2064. long getNumDocAppends() {
  2065. return numDocAppends.count();
  2066. }
  2067. /**
  2068. * Returns the number of documents have been updated since this engine was opened.
  2069. * This count does not include the updates from the existing segments before opening engine.
  2070. */
  2071. long getNumDocUpdates() {
  2072. return numDocUpdates.count();
  2073. }
  2074. @Override
  2075. public boolean isRecovering() {
  2076. return pendingTranslogRecovery.get();
  2077. }
  2078. /**
  2079. * Gets the commit data from {@link IndexWriter} as a map.
  2080. */
  2081. private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
  2082. Map<String, String> commitData = new HashMap<>(6);
  2083. for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
  2084. commitData.put(entry.getKey(), entry.getValue());
  2085. }
  2086. return commitData;
  2087. }
  2088. }