Browse Source

optim bulkWriter for support enableVirtualStyleEndpoint (#976)

Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
xushuang.hu 1 year ago
parent
commit
9c074deed3

+ 3 - 2
examples/main/java/io/milvus/v1/BulkWriterExample.java

@@ -95,8 +95,8 @@ public class BulkWriterExample {
         public static final String STORAGE_ACCESS_KEY = "minioadmin"; // default ak of MinIO/Milvus standalone
         public static final String STORAGE_SECRET_KEY = "minioadmin"; // default sk of MinIO/Milvus standalone
         /**
-         * if using local storage such as Minio
-         * Please set this parameter to empty.
+         * if using remote storage, please configure the parameter
+         * if using local storage such as Local Minio, please set this parameter to empty.
          */
         public static final String STORAGE_REGION = "";
 
@@ -437,6 +437,7 @@ public class BulkWriterExample {
         } else {
             connectParam = S3ConnectParam.newBuilder()
                     .withEndpoint(StorageConsts.STORAGE_ENDPOINT)
+                    .withCloudName(StorageConsts.cloudStorage.getCloudName())
                     .withBucketName(StorageConsts.STORAGE_BUCKET)
                     .withAccessKey(StorageConsts.STORAGE_ACCESS_KEY)
                     .withSecretKey(StorageConsts.STORAGE_SECRET_KEY)

+ 3 - 1
src/main/java/io/milvus/bulkwriter/RemoteBulkWriter.java

@@ -120,7 +120,9 @@ public class RemoteBulkWriter extends LocalBulkWriter {
 
         if (connectParam instanceof S3ConnectParam) {
             S3ConnectParam s3ConnectParam = (S3ConnectParam) connectParam;
-            storageClient = MinioStorageClient.getStorageClient(s3ConnectParam.getEndpoint(),
+            storageClient = MinioStorageClient.getStorageClient(
+                    s3ConnectParam.getCloudName(),
+                    s3ConnectParam.getEndpoint(),
                     s3ConnectParam.getAccessKey(),
                     s3ConnectParam.getSecretKey(),
                     s3ConnectParam.getSessionToken(),

+ 12 - 7
src/main/java/io/milvus/bulkwriter/common/clientenum/CloudStorage.java

@@ -20,22 +20,27 @@
 package io.milvus.bulkwriter.common.clientenum;
 
 import io.milvus.exception.ParamException;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 
 public enum CloudStorage {
-    MINIO("%s", "minioAddress"),
-    AWS("s3.amazonaws.com", null),
-    GCP("storage.googleapis.com", null),
-    AZURE("%s.blob.core.windows.net", "accountName"),
-    ALI("oss-%s.aliyuncs.com", "region"),
-    TC("cos.%s.myqcloud.com", "region")
+    MINIO("minio","%s", "minioAddress"),
+    AWS("aws","s3.amazonaws.com", null),
+    GCP("gcp" ,"storage.googleapis.com", null),
+    AZURE("azure" ,"%s.blob.core.windows.net", "accountName"),
+    ALI("ali","oss-%s.aliyuncs.com", "region"),
+    TC("tc","cos.%s.myqcloud.com", "region")
     ;
 
+    @Getter
+    private String cloudName;
+
     private String endpoint;
 
     private String replace;
 
-    CloudStorage(String endpoint, String replace) {
+    CloudStorage(String cloudName, String endpoint, String replace) {
+        this.cloudName = cloudName;
         this.endpoint = endpoint;
         this.replace = replace;
     }

+ 15 - 0
src/main/java/io/milvus/bulkwriter/connect/S3ConnectParam.java

@@ -25,6 +25,7 @@ import lombok.Getter;
 import lombok.NonNull;
 import lombok.ToString;
 import okhttp3.OkHttpClient;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Parameters for <code>RemoteBulkWriter</code> interface.
@@ -39,6 +40,7 @@ public class S3ConnectParam extends StorageConnectParam {
     private final String sessionToken;
     private final String region;
     private final OkHttpClient httpClient;
+    private final String cloudName;
 
     private S3ConnectParam(@NonNull Builder builder) {
         this.bucketName = builder.bucketName;
@@ -48,6 +50,7 @@ public class S3ConnectParam extends StorageConnectParam {
         this.sessionToken = builder.sessionToken;
         this.region = builder.region;
         this.httpClient = builder.httpClient;
+        this.cloudName = builder.cloudName;
     }
 
     public static Builder newBuilder() {
@@ -65,10 +68,22 @@ public class S3ConnectParam extends StorageConnectParam {
         private String sessionToken;
         private String region;
         private OkHttpClient httpClient;
+        private String cloudName;
 
         private Builder() {
         }
 
+        /**
+         * Sets the cloudName.
+         *
+         * @param cloudName cloud name
+         * @return <code>Builder</code>
+         */
+        public Builder withCloudName(@NotNull String cloudName) {
+            this.cloudName = cloudName;
+            return this;
+        }
+
         /**
          * Sets the bucketName info.
          *

+ 10 - 6
src/main/java/io/milvus/bulkwriter/storage/client/MinioStorageClient.java

@@ -19,8 +19,7 @@
 
 package io.milvus.bulkwriter.storage.client;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import io.milvus.bulkwriter.common.clientenum.CloudStorage;
 import io.milvus.bulkwriter.storage.StorageClient;
 import io.minio.BucketExistsArgs;
 import io.minio.MinioClient;
@@ -40,17 +39,17 @@ import static com.amazonaws.services.s3.internal.Constants.MB;
 public class MinioStorageClient extends MinioClient implements StorageClient {
     private static final Logger logger = LoggerFactory.getLogger(MinioStorageClient.class);
 
-    protected MinioStorageClient(MinioClient client, Multimap<String, String> extraHeader) {
+    protected MinioStorageClient(MinioClient client) {
         super(client);
     }
 
-    public static MinioStorageClient getStorageClient(String endpoint,
+    public static MinioStorageClient getStorageClient(String cloudName,
+                                                      String endpoint,
                                                       String accessKey,
                                                       String secretKey,
                                                       String sessionToken,
                                                       String region,
                                                       OkHttpClient httpClient) {
-        Multimap<String, String> extraHeader = HashMultimap.create();
         MinioClient.Builder minioClientBuilder = MinioClient.builder()
                 .endpoint(endpoint)
                 .credentialsProvider(new StaticProvider(accessKey, secretKey, sessionToken));
@@ -63,7 +62,12 @@ public class MinioStorageClient extends MinioClient implements StorageClient {
             minioClientBuilder.httpClient(httpClient);
         }
 
-        return new MinioStorageClient(minioClientBuilder.build(), extraHeader);
+        MinioClient minioClient = minioClientBuilder.build();
+        if (CloudStorage.TC.getCloudName().equals(cloudName)) {
+            minioClient.enableVirtualStyleEndpoint();
+        }
+
+        return new MinioStorageClient(minioClient);
     }
 
     public Long getObjectEntity(String bucketName, String objectKey) throws Exception {