1
0
Эх сурвалжийг харах

[SNAPSHOT] Ensure BWC layer can read chunked blobs

Simon Willnauer 11 жил өмнө
parent
commit
59da079bae

+ 54 - 3
src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java

@@ -48,6 +48,7 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.RepositoryName;
 
+import java.io.ByteArrayOutputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -692,20 +693,70 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
      * The new logic for StoreFileMetaData reads the entire <tt>.si</tt> and <tt>segments.n</tt> files to strengthen the
      * comparison of the files on a per-segment / per-commit level.
      */
-    private static final void maybeRecalculateMetadataHash(ImmutableBlobContainer blobContainer, FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws IOException {
+    private static final void maybeRecalculateMetadataHash(final ImmutableBlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Throwable {
         final StoreFileMetaData metadata;
         if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) {
             if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) {
                 // we have a hash - check if our repo has a hash too otherwise we have
                 // to calculate it.
-                final byte[] bytes = blobContainer.readBlobFully(fileInfo.name());
+                final ByteArrayOutputStream out = new ByteArrayOutputStream();
+                final CountDownLatch latch = new CountDownLatch(1);
+                final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<>();
+                // we might have multiple parts even though the file is small... make sure we read all of it.
+                // TODO this API should really support a stream!
+                blobContainer.readBlob(fileInfo.partName(0), new BlobContainer.ReadBlobListener() {
+                    final AtomicInteger partIndex = new AtomicInteger();
+                    @Override
+                    public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
+                        out.write(data, offset, size);
+                    }
+
+                    @Override
+                    public synchronized void onCompleted() {
+                        boolean countDown = true;
+                        try {
+                            final int part = partIndex.incrementAndGet();
+                            if (part < fileInfo.numberOfParts()) {
+                                final String partName = fileInfo.partName(part);
+                                // continue with the new part
+                                blobContainer.readBlob(partName, this);
+                                countDown = false;
+                                return;
+                            }
+                        } finally {
+                            if (countDown) {
+                                latch.countDown();
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        try {
+                            failures.add(t);
+                        } finally {
+                            latch.countDown();
+                        }
+                    }
+                });
+
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                }
+
+                if (!failures.isEmpty()) {
+                    ExceptionsHelper.rethrowAndSuppress(failures);
+                }
+
+                final byte[] bytes = out.toByteArray();
                 assert bytes != null;
                 assert bytes.length == fileInfo.length() : bytes.length + " != " + fileInfo.length();
                 final BytesRef spare = new BytesRef(bytes);
                 Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare);
             }
         }
-
     }
 
     /**

+ 6 - 3
src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java

@@ -35,7 +35,9 @@ import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -157,11 +159,11 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
 
     public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException, IOException {
         Client client = client();
-
+        final File tempDir = newTempDir(LifecycleScope.SUITE).getAbsoluteFile();
         logger.info("-->  creating repository");
         assertAcked(client.admin().cluster().preparePutRepository("test-repo")
                 .setType("fs").setSettings(ImmutableSettings.settingsBuilder()
-                        .put("location", newTempDir(LifecycleScope.SUITE).getAbsoluteFile())
+                        .put("location", tempDir)
                         .put("compress", randomBoolean())
                         .put("chunk_size", randomIntBetween(100, 1000))));
 
@@ -213,7 +215,7 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
         if (cluster().numDataNodes() > 1 && randomBoolean()) { // only bump the replicas if we have enough nodes
             client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1,2))).get();
         }
-
+        logger.debug("---> repo exists: " + new File(tempDir, "indices/test/0").exists() + " files: " + Arrays.toString(new File(tempDir, "indices/test/0").list())); // it's only one shard!
         CreateSnapshotResponse createSnapshotResponseSecond = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-1").setWaitForCompletion(true).setIndices("test").get();
         assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0));
         assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards()));
@@ -222,6 +224,7 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
             SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0);
             List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
             for (SnapshotIndexShardStatus status : shards) {
+
                 assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files
             }
         }