1
0
Эх сурвалжийг харах

[TEST] ensure files are synced otherwise MDW will corrupt them afterwards

Simon Willnauer 10 жил өмнө
parent
commit
edac9c17fa

+ 24 - 5
core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -46,6 +46,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -57,7 +58,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
     private final NodeSettingsService service = new NodeSettingsService(Settings.EMPTY);
 
     public void testSendFiles() throws Throwable {
-        final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
+        Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
+                put("indices.recovery.concurrent_small_file_streams", 1).build();
+        final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
         StartRecoveryRequest request = new StartRecoveryRequest(shardId,
                 new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
                 new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
@@ -82,7 +85,13 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         Store targetStore = newStore(createTempDir());
         handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
             try {
-                return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT));
+                return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
+                    @Override
+                    public void close() throws IOException {
+                        super.close();
+                        store.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
+                    }
+                };
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
@@ -98,7 +107,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
     }
 
     public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
-        final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
+        Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
+                put("indices.recovery.concurrent_small_file_streams", 1).build();
+        final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
         StartRecoveryRequest request = new StartRecoveryRequest(shardId,
                 new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
                 new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
@@ -138,7 +149,13 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         try {
             handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
                 try {
-                    return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT));
+                    return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
+                        @Override
+                        public void close() throws IOException {
+                            super.close();
+                            store.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
+                        }
+                    };
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -153,7 +170,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
 
 
     public void testHandleExceptinoOnSendSendFiles() throws Throwable {
-        final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
+        Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
+                put("indices.recovery.concurrent_small_file_streams", 1).build();
+        final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
         StartRecoveryRequest request = new StartRecoveryRequest(shardId,
                 new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
                 new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),