Browse Source

Set created flag in index operation

Now document created flag is set in the index operation instead of
being returned from engine operation. This change makes the engine
index and delete operations have the same signature.
Areek Zillur 9 years ago
parent
commit
fe5cdd30d5

+ 2 - 2
core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -188,7 +188,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
                     "Dynamic mappings are not available on the node that holds the primary yet");
             }
         }
-        final boolean created = indexShard.index(operation);
+        indexShard.index(operation);
 
         // update the version on request so it will happen on the replicas
         final long version = operation.version();
@@ -197,7 +197,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
 
         assert request.versionType().validateVersionForWrites(request.version());
 
-        IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), created);
+        IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
         return new WriteResult<>(response, operation.getTranslogLocation());
     }
 }

+ 10 - 1
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -277,7 +277,7 @@ public abstract class Engine implements Closeable {
         }
     }
 
-    public abstract boolean index(Index operation) throws EngineException;
+    public abstract void index(Index operation) throws EngineException;
 
     public abstract void delete(Delete delete) throws EngineException;
 
@@ -847,6 +847,7 @@ public abstract class Engine implements Closeable {
     public static class Index extends Operation {
 
         private final ParsedDocument doc;
+        private boolean created;
 
         public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
             super(uid, version, versionType, origin, startTime);
@@ -905,6 +906,14 @@ public abstract class Engine implements Closeable {
             return this.doc.source();
         }
 
+        public boolean isCreated() {
+            return created;
+        }
+
+        public void setCreated(boolean created) {
+            this.created = created;
+        }
+
         @Override
         protected int estimatedSizeInBytes() {
             return (id().length() + type().length()) * 2 + source().length() + 12;

+ 15 - 20
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -394,16 +394,15 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public boolean index(Index index) {
-        final boolean created;
+    public void index(Index index) {
         try (ReleasableLock lock = readLock.acquire()) {
             ensureOpen();
             if (index.origin().isRecovery()) {
                 // Don't throttle recovery operations
-                created = innerIndex(index);
+                innerIndex(index);
             } else {
                 try (Releasable r = throttle.acquireThrottle()) {
-                    created = innerIndex(index);
+                    innerIndex(index);
                 }
             }
         } catch (IllegalStateException | IOException e) {
@@ -414,10 +413,9 @@ public class InternalEngine extends Engine {
             }
             throw new IndexFailedEngineException(shardId, index.type(), index.id(), e);
         }
-        return created;
     }
 
-    private boolean innerIndex(Index index) throws IOException {
+    private void innerIndex(Index index) throws IOException {
         try (Releasable ignored = acquireLock(index.uid())) {
             lastWriteNanos = index.startTime();
             final long currentVersion;
@@ -432,15 +430,16 @@ public class InternalEngine extends Engine {
             }
 
             final long expectedVersion = index.version();
-            if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) return false;
+            if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
+                index.setCreated(false);
+                return;
+            }
 
             final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
 
-            final boolean created = indexOrUpdate(index, currentVersion, versionValue);
+            indexOrUpdate(index, currentVersion, versionValue);
 
             maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
-
-            return created;
         }
     }
 
@@ -450,16 +449,14 @@ public class InternalEngine extends Engine {
         return updatedVersion;
     }
 
-    private boolean indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException {
-        final boolean created;
+    private void indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException {
         if (currentVersion == Versions.NOT_FOUND) {
             // document does not exists, we can optimize for create
-            created = true;
+            index.setCreated(true);
             index(index, indexWriter);
         } else {
-            created = update(index, versionValue, indexWriter);
+            update(index, versionValue, indexWriter);
         }
-        return created;
     }
 
     private static void index(final Index index, final IndexWriter indexWriter) throws IOException {
@@ -470,19 +467,17 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private static boolean update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException {
-        final boolean created;
+    private static void update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException {
         if (versionValue != null) {
-            created = versionValue.delete(); // we have a delete which is not GC'ed...
+            index.setCreated(versionValue.delete()); // we have a delete which is not GC'ed...
         } else {
-            created = false;
+            index.setCreated(false);
         }
         if (index.docs().size() > 1) {
             indexWriter.updateDocuments(index.uid(), index.docs());
         } else {
             indexWriter.updateDocument(index.uid(), index.docs().get(0));
         }
-        return created;
     }
 
     @Override

+ 1 - 1
core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

@@ -106,7 +106,7 @@ public class ShadowEngine extends Engine {
 
 
     @Override
-    public boolean index(Index index) throws EngineException {
+    public void index(Index index) throws EngineException {
         throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
     }
 

+ 5 - 13
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -518,34 +518,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return new Engine.Index(uid, doc, version, versionType, origin, startTime);
     }
 
-    /**
-     * Index a document and return whether it was created, as opposed to just
-     * updated.
-     */
-    public boolean index(Engine.Index index) {
+    public void index(Engine.Index index) {
         ensureWriteAllowed(index);
         Engine engine = getEngine();
-        return index(engine, index);
+        index(engine, index);
     }
 
-    private boolean index(Engine engine, Engine.Index index) {
+    private void index(Engine engine, Engine.Index index) {
         active.set(true);
         index = indexingOperationListeners.preIndex(index);
-        final boolean created;
         try {
             if (logger.isTraceEnabled()) {
                 logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
             }
-            created = engine.index(index);
+            engine.index(index);
             index.endTime(System.nanoTime());
         } catch (Exception e) {
             indexingOperationListeners.postIndex(index, e);
             throw e;
         }
-
-        indexingOperationListeners.postIndex(index, created);
-
-        return created;
+        indexingOperationListeners.postIndex(index, index.isCreated());
     }
 
     public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {

+ 10 - 5
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -1480,28 +1480,33 @@ public class InternalEngineTests extends ESTestCase {
     public void testBasicCreatedFlag() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
         Engine.Index index = new Engine.Index(newUid("1"), doc);
-        assertTrue(engine.index(index));
+        engine.index(index);
+        assertTrue(index.isCreated());
 
         index = new Engine.Index(newUid("1"), doc);
-        assertFalse(engine.index(index));
+        engine.index(index);
+        assertFalse(index.isCreated());
 
         engine.delete(new Engine.Delete(null, "1", newUid("1")));
 
         index = new Engine.Index(newUid("1"), doc);
-        assertTrue(engine.index(index));
+        engine.index(index);
+        assertTrue(index.isCreated());
     }
 
     public void testCreatedFlagAfterFlush() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
         Engine.Index index = new Engine.Index(newUid("1"), doc);
-        assertTrue(engine.index(index));
+        engine.index(index);
+        assertTrue(index.isCreated());
 
         engine.delete(new Engine.Delete(null, "1", newUid("1")));
 
         engine.flush();
 
         index = new Engine.Index(newUid("1"), doc);
-        assertTrue(engine.index(index));
+        engine.index(index);
+        assertTrue(index.isCreated());
     }
 
     private static class MockAppender extends AppenderSkeleton {