Browse Source

GCP supports using stage (#1655)

Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
xushuang.hu 1 week ago
parent
commit
7d65f05534

+ 1 - 1
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManager.java

@@ -137,7 +137,7 @@ public class StageFileManager {
     public void shutdownGracefully() {
         executor.shutdown();
         try {
-            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+            if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
                 logger.warn("Executor didn't terminate in time, forcing shutdown...");
                 executor.shutdownNow();
             }

+ 7 - 0
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/common/clientenum/CloudStorage.java

@@ -71,6 +71,13 @@ public enum CloudStorage {
         return tcCloudStorages.stream().anyMatch(e -> e.getCloudName().equalsIgnoreCase(cloudName));
     }
 
+    public static boolean isGcpCloud(String cloudName) {
+        List<CloudStorage> gcpCloudStorages = Lists.newArrayList(
+                CloudStorage.GCP
+        );
+        return gcpCloudStorages.stream().anyMatch(e -> e.getCloudName().equalsIgnoreCase(cloudName));
+    }
+
     public static boolean isAzCloud(String cloudName) {
         List<CloudStorage> azCloudStorages = Lists.newArrayList(
                 CloudStorage.AZ, CloudStorage.AZURE

+ 39 - 3
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/storage/client/MinioStorageClient.java

@@ -23,7 +23,15 @@ import com.google.common.collect.Multimap;
 import io.milvus.bulkwriter.common.clientenum.CloudStorage;
 import io.milvus.bulkwriter.model.CompleteMultipartUploadOutputModel;
 import io.milvus.bulkwriter.storage.StorageClient;
-import io.minio.*;
+import io.minio.BucketExistsArgs;
+import io.minio.MinioAsyncClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.S3Base;
+import io.minio.StatObjectArgs;
+import io.minio.StatObjectResponse;
+import io.minio.Xml;
+import io.minio.credentials.Provider;
 import io.minio.credentials.StaticProvider;
 import io.minio.errors.ErrorResponseException;
 import io.minio.errors.InsufficientDataException;
@@ -33,7 +41,9 @@ import io.minio.http.Method;
 import io.minio.messages.CompleteMultipartUpload;
 import io.minio.messages.ErrorResponse;
 import io.minio.messages.Part;
+import okhttp3.Interceptor;
 import okhttp3.OkHttpClient;
+import okhttp3.Request;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,8 +75,14 @@ public class MinioStorageClient extends MinioAsyncClient implements StorageClien
                                                       String region,
                                                       OkHttpClient httpClient) {
         MinioAsyncClient.Builder minioClientBuilder = MinioAsyncClient.builder()
-                .endpoint(endpoint)
-                .credentialsProvider(new StaticProvider(accessKey, secretKey, sessionToken));
+                .endpoint(endpoint);
+
+        if (CloudStorage.isGcpCloud(cloudName) && StringUtils.isNotEmpty(sessionToken)) {
+            httpClient = buildAuthorizedClient(httpClient, sessionToken);
+        } else {
+            Provider credentialsProvider = new StaticProvider(accessKey, secretKey, sessionToken);
+            minioClientBuilder.credentialsProvider(credentialsProvider);
+        }
 
         if (StringUtils.isNotEmpty(region)) {
             minioClientBuilder.region(region);
@@ -84,6 +100,26 @@ public class MinioStorageClient extends MinioAsyncClient implements StorageClien
         return new MinioStorageClient(minioClient);
     }
 
+    private static OkHttpClient buildAuthorizedClient(OkHttpClient httpClient, String sessionToken) {
+        Interceptor authInterceptor = chain -> {
+            Request original = chain.request();
+            Request requestWithAuth = original.newBuilder()
+                    .header("Authorization", "Bearer " + sessionToken)
+                    .build();
+            return chain.proceed(requestWithAuth);
+        };
+
+        if (httpClient != null) {
+            return httpClient.newBuilder()
+                    .addInterceptor(authInterceptor)
+                    .build();
+        } else {
+            return new OkHttpClient.Builder()
+                    .addInterceptor(authInterceptor)
+                    .build();
+        }
+    }
+
     public Long getObjectEntity(String bucketName, String objectKey) throws Exception {
         StatObjectArgs statObjectArgs = StatObjectArgs.builder()
                 .bucket(bucketName)