|
@@ -19,6 +19,7 @@
|
|
|
package org.elasticsearch.index.shard;
|
|
|
|
|
|
import org.apache.lucene.index.Term;
|
|
|
+import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.index.engine.Engine;
|
|
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
@@ -29,6 +30,8 @@ import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
+import static org.hamcrest.Matchers.is;
|
|
|
+
|
|
|
public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
|
|
|
// this test also tests if calls are correct if one or more listeners throw exceptions
|
|
@@ -39,76 +42,83 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
AtomicInteger preDelete = new AtomicInteger();
|
|
|
AtomicInteger postDelete = new AtomicInteger();
|
|
|
AtomicInteger postDeleteException = new AtomicInteger();
|
|
|
+ ShardId randomShardId = new ShardId(new Index(randomAsciiOfLength(10), randomAsciiOfLength(10)), randomIntBetween(1, 10));
|
|
|
IndexingOperationListener listener = new IndexingOperationListener() {
|
|
|
@Override
|
|
|
- public Engine.Index preIndex(Engine.Index operation) {
|
|
|
+ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
|
|
|
+ assertThat(shardId, is(randomShardId));
|
|
|
preIndex.incrementAndGet();
|
|
|
return operation;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postIndex(Engine.Index index, Engine.IndexResult result) {
|
|
|
+ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
|
|
|
+ assertThat(shardId, is(randomShardId));
|
|
|
if (result.hasFailure() == false) {
|
|
|
postIndex.incrementAndGet();
|
|
|
} else {
|
|
|
- postIndex(index, result.getFailure());
|
|
|
+ postIndex(shardId, index, result.getFailure());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postIndex(Engine.Index index, Exception ex) {
|
|
|
+ public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
|
|
|
+ assertThat(shardId, is(randomShardId));
|
|
|
postIndexException.incrementAndGet();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Engine.Delete preDelete(Engine.Delete delete) {
|
|
|
+ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
|
|
|
+ assertThat(shardId, is(randomShardId));
|
|
|
preDelete.incrementAndGet();
|
|
|
return delete;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
|
|
|
+ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
|
|
|
+ assertThat(shardId, is(randomShardId));
|
|
|
if (result.hasFailure() == false) {
|
|
|
postDelete.incrementAndGet();
|
|
|
} else {
|
|
|
- postDelete(delete, result.getFailure());
|
|
|
+ postDelete(shardId, delete, result.getFailure());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postDelete(Engine.Delete delete, Exception ex) {
|
|
|
+ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
|
|
|
+ assertThat(shardId, is(randomShardId));
|
|
|
postDeleteException.incrementAndGet();
|
|
|
}
|
|
|
};
|
|
|
|
|
|
IndexingOperationListener throwingListener = new IndexingOperationListener() {
|
|
|
@Override
|
|
|
- public Engine.Index preIndex(Engine.Index operation) {
|
|
|
+ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
|
|
|
throw new RuntimeException();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postIndex(Engine.Index index, Engine.IndexResult result) {
|
|
|
+ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
|
|
|
throw new RuntimeException();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postIndex(Engine.Index index, Exception ex) {
|
|
|
+ public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
|
|
|
throw new RuntimeException();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Engine.Delete preDelete(Engine.Delete delete) {
|
|
|
+ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
|
|
|
throw new RuntimeException();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
|
|
|
+ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
|
|
|
throw new RuntimeException();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void postDelete(Engine.Delete delete, Exception ex) {
|
|
|
+ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
|
|
|
throw new RuntimeException();
|
|
|
}
|
|
|
};
|
|
@@ -120,10 +130,11 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
}
|
|
|
}
|
|
|
Collections.shuffle(indexingOperationListeners, random());
|
|
|
- IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
|
|
|
+ IndexingOperationListener.CompositeListener compositeListener =
|
|
|
+ new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
|
|
|
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
|
|
|
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
|
|
|
- compositeListener.postDelete(delete, new Engine.DeleteResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true));
|
|
|
+ compositeListener.postDelete(randomShardId, delete, new Engine.DeleteResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true));
|
|
|
assertEquals(0, preIndex.get());
|
|
|
assertEquals(0, postIndex.get());
|
|
|
assertEquals(0, postIndexException.get());
|
|
@@ -131,7 +142,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
assertEquals(2, postDelete.get());
|
|
|
assertEquals(0, postDeleteException.get());
|
|
|
|
|
|
- compositeListener.postDelete(delete, new RuntimeException());
|
|
|
+ compositeListener.postDelete(randomShardId, delete, new RuntimeException());
|
|
|
assertEquals(0, preIndex.get());
|
|
|
assertEquals(0, postIndex.get());
|
|
|
assertEquals(0, postIndexException.get());
|
|
@@ -139,7 +150,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
assertEquals(2, postDelete.get());
|
|
|
assertEquals(2, postDeleteException.get());
|
|
|
|
|
|
- compositeListener.preDelete(delete);
|
|
|
+ compositeListener.preDelete(randomShardId, delete);
|
|
|
assertEquals(0, preIndex.get());
|
|
|
assertEquals(0, postIndex.get());
|
|
|
assertEquals(0, postIndexException.get());
|
|
@@ -147,7 +158,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
assertEquals(2, postDelete.get());
|
|
|
assertEquals(2, postDeleteException.get());
|
|
|
|
|
|
- compositeListener.postIndex(index, new Engine.IndexResult(0, SequenceNumbersService.UNASSIGNED_SEQ_NO, false));
|
|
|
+ compositeListener.postIndex(randomShardId, index, new Engine.IndexResult(0, SequenceNumbersService.UNASSIGNED_SEQ_NO, false));
|
|
|
assertEquals(0, preIndex.get());
|
|
|
assertEquals(2, postIndex.get());
|
|
|
assertEquals(0, postIndexException.get());
|
|
@@ -155,7 +166,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
assertEquals(2, postDelete.get());
|
|
|
assertEquals(2, postDeleteException.get());
|
|
|
|
|
|
- compositeListener.postIndex(index, new RuntimeException());
|
|
|
+ compositeListener.postIndex(randomShardId, index, new RuntimeException());
|
|
|
assertEquals(0, preIndex.get());
|
|
|
assertEquals(2, postIndex.get());
|
|
|
assertEquals(2, postIndexException.get());
|
|
@@ -163,7 +174,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|
|
assertEquals(2, postDelete.get());
|
|
|
assertEquals(2, postDeleteException.get());
|
|
|
|
|
|
- compositeListener.preIndex(index);
|
|
|
+ compositeListener.preIndex(randomShardId, index);
|
|
|
assertEquals(2, preIndex.get());
|
|
|
assertEquals(2, postIndex.get());
|
|
|
assertEquals(2, postIndexException.get());
|