Przeglądaj źródła

Improve Stability of GCS Mock API (#49592)

Same as #49518 pretty much but for GCS.
Fixing a few more spots where input stream can get closed
without being fully drained and adding assertions to make sure
it's always drained.
Moved the no-close stream wrapper to production code utilities since
there's a number of spots in production code where it's also useful
(will reuse it there in a follow-up).
Armin Braun 5 lat temu
rodzic
commit
5f57c7efc7

+ 1 - 1
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

@@ -212,7 +212,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
             if ("/token".equals(exchange.getRequestURI().getPath())) {
                 try {
                     // token content is unique per node (not per request)
-                    return Streams.readFully(exchange.getRequestBody()).utf8ToString();
+                    return Streams.readFully(Streams.noCloseStream(exchange.getRequestBody())).utf8ToString();
                 } catch (IOException e) {
                     throw new AssertionError("Unable to read token request body", e);
                 }

+ 16 - 0
server/src/main/java/org/elasticsearch/common/io/Streams.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 
 import java.io.BufferedReader;
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -219,6 +220,21 @@ public abstract class Streams {
         }
     }
 
+    /**
+     * Wraps an {@link InputStream} such that it's {@code close} method becomes a noop
+     *
+     * @param stream {@code InputStream} to wrap
+     * @return wrapped {@code InputStream}
+     */
+    public static InputStream noCloseStream(InputStream stream) {
+        return new FilterInputStream(stream) {
+            @Override
+            public void close() {
+                // noop
+            }
+        };
+    }
+
     /**
      * Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
      * close is called.

+ 13 - 5
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java

@@ -30,12 +30,20 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 @SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service")
 public class FakeOAuth2HttpHandler implements HttpHandler {
 
+    private static final byte[] BUFFER = new byte[1024];
+
     @Override
     public void handle(final HttpExchange exchange) throws IOException {
-        byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
-        exchange.getResponseHeaders().add("Content-Type", "application/json");
-        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
-        exchange.getResponseBody().write(response);
-        exchange.close();
+        try {
+            byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
+            exchange.getResponseHeaders().add("Content-Type", "application/json");
+            exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+            exchange.getResponseBody().write(response);
+            while (exchange.getRequestBody().read(BUFFER) >= 0) ;
+        } finally {
+            int read = exchange.getRequestBody().read();
+            assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
+            exchange.close();
+        }
     }
 }

+ 8 - 4
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

@@ -81,6 +81,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
             assert read == -1 : "Request body should have been empty but saw [" + read + "]";
         }
         try {
+            // Request body is closed in the finally block
+            final InputStream wrappedRequest = Streams.noCloseStream(exchange.getRequestBody());
             if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
                 // List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
                 final Map<String, String> params = new HashMap<>();
@@ -159,7 +161,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                 // Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
                 final String uri = "/storage/v1/b/" + bucket + "/o/";
                 final StringBuilder batch = new StringBuilder();
-                for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
+                for (String line : Streams.readAllLines(new BufferedInputStream(wrappedRequest))) {
                     if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
                         batch.append(line).append('\n');
                     } else if (line.startsWith("DELETE")) {
@@ -179,7 +181,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
 
             } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
                 // Multipart upload
-                Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
+                Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(wrappedRequest);
                 if (content.isPresent()) {
                     blobs.put(content.get().v1(), content.get().v2());
 
@@ -198,7 +200,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                 final String blobName = params.get("name");
                 blobs.put(blobName, BytesArray.EMPTY);
 
-                byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
+                byte[] response = Streams.readFully(wrappedRequest).utf8ToString().getBytes(UTF_8);
                 exchange.getResponseHeaders().add("Content-Type", "application/json");
                 exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
                     + "uploadType=resumable"
@@ -224,7 +226,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                 final int end = getContentRangeEnd(range);
 
                 final ByteArrayOutputStream out = new ByteArrayOutputStream();
-                long bytesRead = Streams.copy(exchange.getRequestBody(), out);
+                long bytesRead = Streams.copy(wrappedRequest, out);
                 int length = Math.max(end + 1, limit != null ? limit : 0);
                 if ((int) bytesRead > length) {
                     throw new AssertionError("Requesting more bytes than available for blob");
@@ -249,6 +251,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                 exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
             }
         } finally {
+            int read = exchange.getRequestBody().read();
+            assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
             exchange.close();
         }
     }

+ 20 - 10
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java

@@ -190,16 +190,26 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
 
         @Override
         public void handle(final HttpExchange exchange) throws IOException {
-            final String requestId = requestUniqueId(exchange);
-            assert Strings.hasText(requestId);
-
-            final boolean canFailRequest = canFailRequest(exchange);
-            final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
-            if (count >= maxErrorsPerRequest || canFailRequest == false) {
-                requests.remove(requestId);
-                delegate.handle(exchange);
-            } else {
-                handleAsError(exchange);
+            try {
+                final String requestId = requestUniqueId(exchange);
+                assert Strings.hasText(requestId);
+
+                final boolean canFailRequest = canFailRequest(exchange);
+                final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
+                if (count >= maxErrorsPerRequest || canFailRequest == false) {
+                    requests.remove(requestId);
+                    delegate.handle(exchange);
+                } else {
+                    handleAsError(exchange);
+                }
+            } finally {
+                try {
+                    int read = exchange.getRequestBody().read();
+                    assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
+                } catch (IOException e) {
+                    // ignored, stream is assumed to have been closed by previous handler
+                }
+                exchange.close();
             }
         }