|
@@ -208,14 +208,15 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|
|
|
|
|
public void testCreateAndDeleteIndexConcurrently() throws InterruptedException {
|
|
public void testCreateAndDeleteIndexConcurrently() throws InterruptedException {
|
|
createIndex("test");
|
|
createIndex("test");
|
|
- final AtomicInteger indexDeleteAction = new AtomicInteger(0);
|
|
|
|
|
|
+ final AtomicInteger indexVersion = new AtomicInteger(0);
|
|
|
|
+ final Object indexVersionLock = new Object();
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
int numDocs = randomIntBetween(1, 10);
|
|
int numDocs = randomIntBetween(1, 10);
|
|
for (int i = 0; i < numDocs; i++) {
|
|
for (int i = 0; i < numDocs; i++) {
|
|
- client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get();
|
|
|
|
|
|
+ client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get();
|
|
}
|
|
}
|
|
- synchronized (indexDeleteAction) { // not necessarily needed here
|
|
|
|
- indexDeleteAction.incrementAndGet();
|
|
|
|
|
|
+ synchronized (indexVersionLock) { // not necessarily needed here but for completeness we lock here too
|
|
|
|
+ indexVersion.incrementAndGet();
|
|
}
|
|
}
|
|
client().admin().indices().prepareDelete("test").execute(new ActionListener<DeleteIndexResponse>() { // this happens async!!!
|
|
client().admin().indices().prepareDelete("test").execute(new ActionListener<DeleteIndexResponse>() { // this happens async!!!
|
|
@Override
|
|
@Override
|
|
@@ -223,11 +224,13 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|
Thread thread = new Thread() {
|
|
Thread thread = new Thread() {
|
|
public void run() {
|
|
public void run() {
|
|
try {
|
|
try {
|
|
- client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get(); // recreate that index
|
|
|
|
- synchronized (indexDeleteAction) {
|
|
|
|
- indexDeleteAction.incrementAndGet();
|
|
|
|
|
|
+ client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get(); // recreate that index
|
|
|
|
+ synchronized (indexVersionLock) {
|
|
|
|
+ // we sync here since we have to ensure that all indexing operations below for a given ID are done before we increment the
|
|
|
|
+ // index version otherwise a doc that is in-flight could make it into an index that it was supposed to be deleted for and our assertion fail...
|
|
|
|
+ indexVersion.incrementAndGet();
|
|
}
|
|
}
|
|
- client().admin().indices().prepareDelete("test").get(); // from here on all docs with index_version == 0|1 must be gone!!!! only 2 are ok;
|
|
|
|
|
|
+ assertAcked(client().admin().indices().prepareDelete("test").get()); // from here on all docs with index_version == 0|1 must be gone!!!! only 2 are ok;
|
|
} finally {
|
|
} finally {
|
|
latch.countDown();
|
|
latch.countDown();
|
|
}
|
|
}
|
|
@@ -238,15 +241,15 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onFailure(Throwable e) {
|
|
public void onFailure(Throwable e) {
|
|
- ExceptionsHelper.reThrowIfNotNull(e);
|
|
|
|
|
|
+ throw new RuntimeException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
);
|
|
);
|
|
numDocs = randomIntBetween(100, 200);
|
|
numDocs = randomIntBetween(100, 200);
|
|
for (int i = 0; i < numDocs; i++) {
|
|
for (int i = 0; i < numDocs; i++) {
|
|
try {
|
|
try {
|
|
- synchronized (indexDeleteAction) {
|
|
|
|
- client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get();
|
|
|
|
|
|
+ synchronized (indexVersionLock) {
|
|
|
|
+ client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get();
|
|
}
|
|
}
|
|
} catch (IndexNotFoundException inf) {
|
|
} catch (IndexNotFoundException inf) {
|
|
// fine
|
|
// fine
|
|
@@ -254,10 +257,11 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|
}
|
|
}
|
|
latch.await();
|
|
latch.await();
|
|
refresh();
|
|
refresh();
|
|
- SearchResponse expected = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(new RangeQueryBuilder("index_version").from(indexDeleteAction.get(), true)).get();
|
|
|
|
|
|
+
|
|
|
|
+ // we only really assert that we never reuse segments of old indices or anything like this here and that nothing fails with crazy exceptions
|
|
|
|
+ SearchResponse expected = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(new RangeQueryBuilder("index_version").from(indexVersion.get(), true)).get();
|
|
SearchResponse all = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).get();
|
|
SearchResponse all = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).get();
|
|
- assertEquals(expected + " vs. " + all, expected.getHits().getTotalHits(), all.getHits().getTotalHits()
|
|
|
|
- );
|
|
|
|
|
|
+ assertEquals(expected + " vs. " + all, expected.getHits().getTotalHits(), all.getHits().getTotalHits());
|
|
logger.info("total: {}", expected.getHits().getTotalHits());
|
|
logger.info("total: {}", expected.getHits().getTotalHits());
|
|
}
|
|
}
|
|
|
|
|