Răsfoiți Sursa

Faster and Simpler GCS REST Mock (#50706)

* Faster and Simpler GCS REST Mock

I reworked the GCS mock a little to use less copying+allocation,
log the full request body on failure to read a multi-part request
and generally be a little simpler and easy to follow to track down
the remaining issues that are causing almost daily failures from this
class's multi-part request parsing that can't be reproduced locally.
Armin Braun 5 ani în urmă
părinte
comite
331ce5b911

+ 55 - 54
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

@@ -20,6 +20,9 @@ package fixture.gcs;
 
 
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
 import com.sun.net.httpserver.HttpHandler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.ArrayUtil;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.SuppressForbidden;
@@ -33,14 +36,14 @@ import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.RestUtils;
 
 
-import java.io.BufferedInputStream;
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.List;
@@ -54,6 +57,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiFunction;
 import java.util.function.BiFunction;
 import java.util.regex.Matcher;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPInputStream;
 
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -64,6 +68,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
 @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
 public class GoogleCloudStorageHttpHandler implements HttpHandler {
 public class GoogleCloudStorageHttpHandler implements HttpHandler {
 
 
+    private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
+
     private final ConcurrentMap<String, BytesReference> blobs;
     private final ConcurrentMap<String, BytesReference> blobs;
     private final String bucket;
     private final String bucket;
 
 
@@ -262,63 +268,58 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
 
 
     public static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
     public static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
         Tuple<String, BytesArray> content = null;
         Tuple<String, BytesArray> content = null;
-        try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) {
-            String name = null;
-            int read;
-            ByteArrayOutputStream out = new ByteArrayOutputStream() {
-                @Override
-                public byte[] toByteArray() {
-                    return buf;
-                }
-            };
-            boolean skippedEmptyLine = false;
-            while ((read = in.read()) != -1) {
-                out.reset();
-                boolean markAndContinue = false;
-                do { // search next consecutive {carriage return, new line} chars and stop
-                    if ((char) read == '\r') {
-                        int next = in.read();
-                        if (next != -1) {
-                            if (next == '\n') {
-                                break;
-                            }
-                            out.write(read);
-                            out.write(next);
-                            continue;
-                        }
-                    }
-                    out.write(read);
-                } while ((read = in.read()) != -1);
-                final String bucketPrefix = "{\"bucket\":";
-                final String start = new String(out.toByteArray(), 0, Math.min(out.size(), bucketPrefix.length()), UTF_8);
-                if ((skippedEmptyLine == false && start.length() == 0) || start.startsWith("--")
-                    || start.toLowerCase(Locale.ROOT).startsWith("content")) {
-                    markAndContinue = true;
-                } else if (start.startsWith(bucketPrefix)) {
+        final BytesReference fullRequestBody;
+        try (InputStream in = new GZIPInputStream(requestBody)) {
+            fullRequestBody = Streams.readFully(in);
+        }
+        String name = null;
+        boolean skippedEmptyLine = false;
+        int startPos = 0;
+        int endPos = 0;
+        while (startPos < fullRequestBody.length()) {
+            do {
+                endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1);
+            } while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n');
+            boolean markAndContinue = false;
+            final String bucketPrefix = "{\"bucket\":";
+            if (startPos > 0) {
+                startPos += 2;
+            }
+            if (name == null || skippedEmptyLine == false) {
+                if ((skippedEmptyLine == false && endPos == startPos)
+                    || (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) {
                     markAndContinue = true;
                     markAndContinue = true;
-                    final String line = new String(out.toByteArray(), bucketPrefix.length(), out.size() - bucketPrefix.length(), UTF_8);
-                    Matcher matcher = NAME_PATTERN.matcher(line);
-                    if (matcher.find()) {
-                        name = matcher.group(1);
-                    }
-                }
-                if (markAndContinue) {
-                    skippedEmptyLine = start.length() == 0;
-                    in.mark(Integer.MAX_VALUE);
-                    continue;
-                }
-                if (name != null) {
-                    in.reset();
-                    out.reset();
-                    while ((read = in.read()) != -1) {
-                        out.write(read);
+                } else {
+                    final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString();
+                    if (start.toLowerCase(Locale.ROOT).startsWith("content")) {
+                        markAndContinue = true;
+                    } else if (start.startsWith(bucketPrefix)) {
+                        markAndContinue = true;
+                        final String line = fullRequestBody.slice(
+                            startPos + bucketPrefix.length(), endPos - startPos - bucketPrefix.length()).utf8ToString();
+                        Matcher matcher = NAME_PATTERN.matcher(line);
+                        if (matcher.find()) {
+                            name = matcher.group(1);
+                        }
                     }
                     }
-                    // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
-                    content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(out.toByteArray(), out.size() - 23)));
-                    break;
                 }
                 }
+                skippedEmptyLine = markAndContinue && endPos == startPos;
+                startPos = endPos;
+            } else {
+                // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
+                int len = fullRequestBody.length() - startPos - 23;
+                final InputStream stream = fullRequestBody.slice(startPos, len).streamInput();
+                final byte[] buffer = new byte[len];
+                Streams.readFully(stream, buffer);
+                content = Tuple.tuple(name, new BytesArray(buffer));
+                break;
             }
             }
         }
         }
+        if (content == null) {
+            final InputStream stream = fullRequestBody.streamInput();
+            logger.warn(() -> new ParameterizedMessage("Failed to find multi-part upload in [{}]", new BufferedReader(
+                new InputStreamReader(stream)).lines().collect(Collectors.joining("\n"))));
+        }
         return Optional.ofNullable(content);
         return Optional.ofNullable(content);
     }
     }