瀏覽代碼

Upgrade the bulkWriter cloud API call from v1 to v2 (#1056)

Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
xushuang.hu 8 月之前
父節點
當前提交
03365f7a66

+ 64 - 26
examples/main/java/io/milvus/v1/BulkWriterExample.java

@@ -26,7 +26,12 @@ import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
-import io.milvus.bulkwriter.*;
+import io.milvus.bulkwriter.BulkWriter;
+import io.milvus.bulkwriter.CloudImportV2;
+import io.milvus.bulkwriter.LocalBulkWriter;
+import io.milvus.bulkwriter.LocalBulkWriterParam;
+import io.milvus.bulkwriter.RemoteBulkWriter;
+import io.milvus.bulkwriter.RemoteBulkWriterParam;
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
 import io.milvus.bulkwriter.common.clientenum.CloudStorage;
 import io.milvus.bulkwriter.common.utils.GeneratorUtils;
@@ -35,17 +40,37 @@ import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
 import io.milvus.bulkwriter.connect.AzureConnectParam;
 import io.milvus.bulkwriter.connect.S3ConnectParam;
 import io.milvus.bulkwriter.connect.StorageConnectParam;
-import io.milvus.bulkwriter.response.BulkImportResponse;
-import io.milvus.bulkwriter.response.GetImportProgressResponse;
-import io.milvus.bulkwriter.response.ListImportJobsResponse;
+import io.milvus.bulkwriter.request.v2.BulkImportV2Request;
+import io.milvus.bulkwriter.request.v2.GetImportProgressV2Request;
+import io.milvus.bulkwriter.request.v2.ListImportJobsV2Request;
+import io.milvus.bulkwriter.response.v2.BulkImportV2Response;
+import io.milvus.bulkwriter.response.v2.GetImportProgressV2Response;
+import io.milvus.bulkwriter.response.v2.ListImportJobsV2Response;
 import io.milvus.client.MilvusClient;
 import io.milvus.client.MilvusServiceClient;
 import io.milvus.common.utils.ExceptionUtils;
-import io.milvus.grpc.*;
-import io.milvus.param.*;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.GetCollectionStatisticsResponse;
+import io.milvus.grpc.GetImportStateResponse;
+import io.milvus.grpc.ImportResponse;
+import io.milvus.grpc.ImportState;
+import io.milvus.grpc.KeyValuePair;
+import io.milvus.grpc.QueryResults;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
 import io.milvus.param.bulkinsert.BulkInsertParam;
 import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
-import io.milvus.param.collection.*;
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.DropCollectionParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.FlushParam;
+import io.milvus.param.collection.GetCollectionStatisticsParam;
+import io.milvus.param.collection.HasCollectionParam;
+import io.milvus.param.collection.LoadCollectionParam;
 import io.milvus.param.dml.QueryParam;
 import io.milvus.param.index.CreateIndexParam;
 import io.milvus.response.GetCollStatResponseWrapper;
@@ -122,13 +147,16 @@ public class BulkWriterExample {
     public static class CloudImportConsts {
 
         /**
-         * If you are an overseas user, you can use the following endpoint format: https://controller.api.{cloud-region}.zillizcloud.com/v1/vector/collections/import.
-         * If not, you can use the following endpoint format: https://controller.api.${CLOUD_REGION_ID}.cloud.zilliz.com.cn/v1/vector/collections/import.
+         * The value of the URL is fixed.
+         * For overseas regions, it is: https://api.cloud.zilliz.com
+         * For regions in China, it is: https://api.cloud.zilliz.com.cn
          */
-        public static final String CLOUD_ENDPOINT = "https://controller.api.${CLOUD-REGION}.{ENDPOINT-SUFFIX}";
-        public static final String API_KEY = "_api_key_of_the_user";
-        public static final String CLUSTER_ID = "_your_cloud_instance_id_";
-        public static final String COLLECTION_NAME = "_collection_name_on_the_cloud_";
+        public static final String CLOUD_ENDPOINT = "https://api.cloud.zilliz.com";
+        public static final String API_KEY = "_api_key_for_cluster_org_";
+        public static final String CLUSTER_ID = "_your_cloud_cluster_id_";
+        public static final String COLLECTION_NAME = "_collection_name_on_the_cluster_id_";
+        // If partition_name is not specified, use ""
+        public static final String PARTITION_NAME = "_partition_name_on_the_collection_";
 
         /**
          * Please provide the complete URL for the file or folder you want to import, similar to https://bucket-name.s3.region-code.amazonaws.com/object-name.
@@ -146,6 +174,7 @@ public class BulkWriterExample {
     private MilvusClient milvusClient;
 
     public static void main(String[] args) throws Exception {
+
         BulkWriterExample exampleBulkWriter = new BulkWriterExample();
         exampleBulkWriter.createConnection();
 
@@ -202,7 +231,7 @@ public class BulkWriterExample {
 //            CollectionSchemaParam collectionSchema = buildAllTypesSchema();
 //            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(collectionSchema, fileType);
 //            exampleBulkWriter.createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);
-//            exampleBulkWriter.callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME);
+//            exampleBulkWriter.callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME, StringUtils.EMPTY);
 //            exampleBulkWriter.retrieveImportData();
 //        }
     }
@@ -533,7 +562,7 @@ public class BulkWriterExample {
         System.out.println("Collection row number: " + getCollectionStatistics());
     }
 
-    private void callCloudImport(List<List<String>> batchFiles, String collectionName) throws InterruptedException, MalformedURLException {
+    private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException, MalformedURLException {
         System.out.println("\n===================== call cloudImport ====================");
 
         String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
@@ -542,7 +571,11 @@ public class BulkWriterExample {
         String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
         String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;
 
-        BulkImportResponse bulkImportResponse = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, objectUrl, accessKey, secretKey, CloudImportConsts.CLUSTER_ID, collectionName);
+        BulkImportV2Request bulkImportRequest = BulkImportV2Request.builder()
+                .objectUrl(objectUrl).accessKey(accessKey).secretKey(secretKey)
+                .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
+                .build();
+        BulkImportV2Response bulkImportResponse = CloudImportV2.createImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, bulkImportRequest);
         String jobId = bulkImportResponse.getJobId();
         System.out.println("Create a cloudImport job, job id: " + jobId);
 
@@ -550,15 +583,16 @@ public class BulkWriterExample {
             System.out.println("Wait 5 second to check bulkInsert job state...");
             TimeUnit.SECONDS.sleep(5);
 
-            GetImportProgressResponse getImportProgressResponse = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, jobId, CloudImportConsts.CLUSTER_ID);
-            if (getImportProgressResponse.getReadyPercentage().intValue() == 1) {
+            GetImportProgressV2Request request = GetImportProgressV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
+            GetImportProgressV2Response getImportProgressResponse = CloudImportV2.getImportJobProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
+            if ("Completed".equals(getImportProgressResponse.getState())) {
                 System.out.printf("The job %s completed%n", jobId);
                 break;
-            } else if (StringUtils.isNotEmpty(getImportProgressResponse.getErrorMessage())) {
-                System.out.printf("The job %s failed, reason: %s%n", jobId, getImportProgressResponse.getErrorMessage());
+            } else if (StringUtils.isNotEmpty(getImportProgressResponse.getReason())) {
+                System.out.printf("The job %s failed or canceled, reason: %s%n", jobId, getImportProgressResponse.getReason());
                 break;
             } else {
-                System.out.printf("The job %s is running, progress:%s%n", jobId, getImportProgressResponse.getReadyPercentage());
+                System.out.printf("The job %s is running, progress:%s%n", jobId, getImportProgressResponse.getProgress());
             }
         }
 
@@ -737,18 +771,22 @@ public class BulkWriterExample {
 
     private static void exampleCloudImport() throws MalformedURLException {
         System.out.println("\n===================== import files to cloud vectordb ====================");
-        BulkImportResponse bulkImportResponse = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY,
-                CloudImportConsts.OBJECT_URL, CloudImportConsts.OBJECT_ACCESS_KEY, CloudImportConsts.OBJECT_SECRET_KEY,
-                CloudImportConsts.CLUSTER_ID, CloudImportConsts.COLLECTION_NAME);
+        BulkImportV2Request request = BulkImportV2Request.builder()
+                .objectUrl(CloudImportConsts.OBJECT_URL).accessKey(CloudImportConsts.OBJECT_ACCESS_KEY).secretKey(CloudImportConsts.OBJECT_SECRET_KEY)
+                .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
+                .build();
+        BulkImportV2Response bulkImportResponse = CloudImportV2.createImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
         System.out.println(GSON_INSTANCE.toJson(bulkImportResponse));
 
         System.out.println("\n===================== get import job progress ====================");
         String jobId = bulkImportResponse.getJobId();
-        GetImportProgressResponse getImportProgressResponse = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, jobId, CloudImportConsts.CLUSTER_ID);
+        GetImportProgressV2Request getImportProgressRequest = GetImportProgressV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
+        GetImportProgressV2Response getImportProgressResponse = CloudImportV2.getImportJobProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, getImportProgressRequest);
         System.out.println(GSON_INSTANCE.toJson(getImportProgressResponse));
 
         System.out.println("\n===================== list import jobs ====================");
-        ListImportJobsResponse listImportJobsResponse = CloudImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, CloudImportConsts.CLUSTER_ID, 10, 1);
+        ListImportJobsV2Request listImportJobsRequest = ListImportJobsV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).build();
+        ListImportJobsV2Response listImportJobsResponse = CloudImportV2.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, listImportJobsRequest);
         System.out.println(GSON_INSTANCE.toJson(listImportJobsResponse));
     }
 

+ 88 - 0
src/main/java/io/milvus/bulkwriter/BaseCloudImport.java

@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter;
+
+import com.google.gson.Gson;
+import io.milvus.bulkwriter.response.RestfulResponse;
+import io.milvus.common.utils.ExceptionUtils;
+import kong.unirest.Unirest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BaseCloudImport {
+    private static final Gson GSON_INSTANCE = new Gson();
+
+    protected static String postRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
+        try {
+            kong.unirest.HttpResponse<String> response = Unirest.post(url)
+                    .connectTimeout(timeout)
+                    .headers(httpHeaders(apiKey))
+                    .body(params).asString();
+            if (response.getStatus() != 200) {
+                ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, status code: %s", url, response.getStatus()));
+            } else {
+                return response.getBody();
+            }
+        } catch (Exception e) {
+            ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, error: %s", url, e));
+        }
+        return null;
+    }
+
+    protected static String getRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
+        try {
+            kong.unirest.HttpResponse<String> response = Unirest.get(url)
+                    .connectTimeout(timeout)
+                    .headers(httpHeaders(apiKey))
+                    .queryString(params).asString();
+            if (response.getStatus() != 200) {
+                ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, status code: %s", url, response.getStatus()));
+            } else {
+                return response.getBody();
+            }
+        } catch (Exception e) {
+            ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, error: %s", url, e));
+        }
+        return null;
+    }
+
+
+    protected static Map<String, String> httpHeaders(String apiKey) {
+        Map<String, String> header = new HashMap<>();
+        header.put("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) " +
+                "Chrome/17.0.963.56 Safari/535.11");
+        header.put("Accept", "application/json");
+        header.put("Content-Type", "application/json");
+        header.put("Accept-Encodin", "gzip,deflate,sdch");
+        header.put("Accept-Languag", "en-US,en;q=0.5");
+        header.put("Authorization", "Bearer " + apiKey);
+
+        return header;
+    }
+
+    protected static void handleResponse(String url, RestfulResponse res) {
+        int innerCode = res.getCode();
+        if (innerCode != 0) {
+            String innerMessage = res.getMessage();
+            ExceptionUtils.throwUnExpectedException(String.format("Failed to request url: %s, code: %s, message: %s", url, innerCode, innerMessage));
+        }
+    }
+}

+ 17 - 73
src/main/java/io/milvus/bulkwriter/CloudImport.java

@@ -23,19 +23,18 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.milvus.bulkwriter.response.BulkImportResponse;
 import io.milvus.bulkwriter.response.GetImportProgressResponse;
+import io.milvus.bulkwriter.response.v2.GetImportProgressV2Response;
 import io.milvus.bulkwriter.response.ListImportJobsResponse;
 import io.milvus.bulkwriter.response.RestfulResponse;
-import io.milvus.common.utils.ExceptionUtils;
-import kong.unirest.Unirest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 
-public class CloudImport {
+@Deprecated
+// use CloudImportV2 replace
+public class CloudImport extends BaseCloudImport {
     private static final Gson GSON_INSTANCE = new Gson();
 
     public static BulkImportResponse bulkImport(String url, String apiKey, String objectUrl,
@@ -43,9 +42,9 @@ public class CloudImport {
         String requestURL;
         String protocol = new URL(url).getProtocol();
         if (protocol.startsWith("http")) {
-            requestURL = url + "/v1/vector/collections/import";
+            requestURL = url + "/v2/vectordb/jobs/import/create";
         } else {
-            requestURL = String.format("https://%s/v1/vector/collections/import", url);
+            requestURL = String.format("https://%s/v2/vectordb/jobs/import/create", url);
         }
 
         Map<String, Object> params = new HashMap<>();
@@ -57,7 +56,7 @@ public class CloudImport {
 
         String body = postRequest(requestURL, apiKey, params, 60 * 1000);
         RestfulResponse<BulkImportResponse> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<BulkImportResponse>>(){}.getType());
-        handleResponse(url, response);
+        handleResponse(requestURL, response);
         return response.getData();
     }
 
@@ -65,28 +64,28 @@ public class CloudImport {
         String requestURL;
         String protocol = new URL(url).getProtocol();
         if (protocol.startsWith("http")) {
-            requestURL = url + "/v1/vector/collections/import/get";
+            requestURL = url + "/v2/vectordb/jobs/import/getProgress";
         } else {
-            requestURL = String.format("https://%s/v1/vector/collections/import/get", url);
+            requestURL = String.format("https://%s/v2/vectordb/jobs/import/getProgress", url);
         }
 
         Map<String, Object> params = new HashMap<>();
         params.put("clusterId", clusterId);
         params.put("jobId", jobId);
 
-        String body = getRequest(requestURL, apiKey, params, 60 * 1000);
-        RestfulResponse<GetImportProgressResponse> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<GetImportProgressResponse>>(){}.getType());
-        handleResponse(url, response);
-        return response.getData();
+        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        RestfulResponse<GetImportProgressV2Response> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<GetImportProgressV2Response>>(){}.getType());
+        handleResponse(requestURL, response);
+        return response.getData().toGetImportProgressResponse();
     }
 
     public static ListImportJobsResponse listImportJobs(String url, String apiKey, String clusterId, int pageSize, int currentPage) throws MalformedURLException {
         String requestURL;
         String protocol = new URL(url).getProtocol();
         if (protocol.startsWith("http")) {
-            requestURL = url + "/v1/vector/collections/import/list";
+            requestURL = url + "/v2/vectordb/jobs/import/list";
         } else {
-            requestURL = String.format("https://%s/v1/vector/collections/import/list", url);
+            requestURL = String.format("https://%s/v2/vectordb/jobs/import/list", url);
         }
 
         Map<String, Object> params = new HashMap<>();
@@ -94,64 +93,9 @@ public class CloudImport {
         params.put("pageSize", pageSize);
         params.put("currentPage", currentPage);
 
-        String body = getRequest(requestURL, apiKey, params, 60 * 1000);
+        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
         RestfulResponse<ListImportJobsResponse> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<ListImportJobsResponse>>(){}.getType());
-        handleResponse(url, response);
+        handleResponse(requestURL, response);
         return response.getData();
     }
-
-    private static String postRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
-        try {
-            kong.unirest.HttpResponse<String> response = Unirest.post(url)
-                    .connectTimeout(timeout)
-                    .headers(httpHeaders(apiKey))
-                    .body(params).asString();
-            if (response.getStatus() != 200) {
-                ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, status code: %s", url, response.getStatus()));
-            } else {
-                return response.getBody();
-            }
-        } catch (Exception e) {
-            ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, error: %s", url, e));
-        }
-        return null;
-    }
-
-    private static String getRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
-        try {
-            kong.unirest.HttpResponse<String> response = Unirest.get(url)
-                    .connectTimeout(timeout)
-                    .headers(httpHeaders(apiKey))
-                    .queryString(params).asString();
-            if (response.getStatus() != 200) {
-                ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, status code: %s", url, response.getStatus()));
-            } else {
-                return response.getBody();
-            }
-        } catch (Exception e) {
-            ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, error: %s", url, e));
-        }
-        return null;
-    }
-
-
-    private static Map<String, String> httpHeaders(String apiKey) {
-        Map<String, String> header = new HashMap<>();
-        header.put("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) " +
-                "Chrome/17.0.963.56 Safari/535.11");
-        header.put("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8");
-        header.put("Accept-Encodin", "gzip,deflate,sdch");
-        header.put("Accept-Languag", "en-US,en;q=0.5");
-        header.put("Authorization", "Bearer " + apiKey);
-
-        return header;
-    }
-
-    private static void handleResponse(String url, RestfulResponse res) {
-        int innerCode = res.getCode();
-        if (innerCode != 200) {
-            String innerMessage = res.getMessage();
-            ExceptionUtils.throwUnExpectedException(String.format("Failed to request url: %s, code: %s, message: %s", url, innerCode, innerMessage));
-        }
-    }
 }

+ 88 - 0
src/main/java/io/milvus/bulkwriter/CloudImportV2.java

@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import io.milvus.bulkwriter.request.v2.BulkImportV2Request;
+import io.milvus.bulkwriter.request.v2.GetImportProgressV2Request;
+import io.milvus.bulkwriter.request.v2.ListImportJobsV2Request;
+import io.milvus.bulkwriter.response.RestfulResponse;
+import io.milvus.bulkwriter.response.v2.BulkImportV2Response;
+import io.milvus.bulkwriter.response.v2.GetImportProgressV2Response;
+import io.milvus.bulkwriter.response.v2.ListImportJobsV2Response;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+public class CloudImportV2 extends BaseCloudImport {
+    private static final Gson GSON_INSTANCE = new Gson();
+
+    public static BulkImportV2Response createImportJobs(String url, String apiKey, BulkImportV2Request request) throws MalformedURLException {
+        String requestURL;
+        String protocol = new URL(url).getProtocol();
+        if (protocol.startsWith("http")) {
+            requestURL = url + "/v2/vectordb/jobs/import/create";
+        } else {
+            requestURL = String.format("https://%s/v2/vectordb/jobs/import/create", url);
+        }
+
+        Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
+        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        RestfulResponse<BulkImportV2Response> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<BulkImportV2Response>>(){}.getType());
+        handleResponse(requestURL, response);
+        return response.getData();
+    }
+
+    public static GetImportProgressV2Response getImportJobProgress(String url, String apiKey, GetImportProgressV2Request request) throws MalformedURLException {
+        String requestURL;
+        String protocol = new URL(url).getProtocol();
+        if (protocol.startsWith("http")) {
+            requestURL = url + "/v2/vectordb/jobs/import/getProgress";
+        } else {
+            requestURL = String.format("https://%s/v2/vectordb/jobs/import/getProgress", url);
+        }
+
+        Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
+        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        RestfulResponse<GetImportProgressV2Response> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<GetImportProgressV2Response>>(){}.getType());
+        handleResponse(requestURL, response);
+        return response.getData();
+    }
+
+    public static ListImportJobsV2Response listImportJobs(String url, String apiKey, ListImportJobsV2Request request) throws MalformedURLException {
+        String requestURL;
+        String protocol = new URL(url).getProtocol();
+        if (protocol.startsWith("http")) {
+            requestURL = url + "/v2/vectordb/jobs/import/list";
+        } else {
+            requestURL = String.format("https://%s/v2/vectordb/jobs/import/list", url);
+        }
+
+        Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
+
+
+        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        RestfulResponse<ListImportJobsV2Response> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<ListImportJobsV2Response>>(){}.getType());
+        handleResponse(requestURL, response);
+        return response.getData();
+    }
+}

+ 41 - 0
src/main/java/io/milvus/bulkwriter/request/v2/BulkImportV2Request.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.v2;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class BulkImportV2Request implements Serializable {
+    private static final long serialVersionUID = 8192049841043084620L;
+    private String objectUrl;
+    private String accessKey;
+    private String secretKey;
+    private String clusterId;
+    private String collectionName;
+    private String partitionName;
+}

+ 37 - 0
src/main/java/io/milvus/bulkwriter/request/v2/GetImportProgressV2Request.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.v2;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetImportProgressV2Request implements Serializable {
+    private static final long serialVersionUID = -787626534606813089L;
+    private String clusterId;
+    private String jobId;
+}

+ 39 - 0
src/main/java/io/milvus/bulkwriter/request/v2/ListImportJobsV2Request.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.v2;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class ListImportJobsV2Request implements Serializable {
+    private static final long serialVersionUID = -1890380396466908530L;
+
+    private String clusterId;
+    private int pageSize;
+    private int currentPage;
+}

+ 1 - 1
src/main/java/io/milvus/bulkwriter/response/GetImportProgressResponse.java

@@ -54,7 +54,7 @@ public class GetImportProgressResponse implements Serializable {
     @Builder
     @AllArgsConstructor
     @NoArgsConstructor
-    private static class Detail {
+    public static class Detail {
         private String fileName;
         private Integer fileSize;
         private Double readyPercentage;

+ 37 - 0
src/main/java/io/milvus/bulkwriter/response/v2/BulkImportV2Response.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.response.v2;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class BulkImportV2Response implements Serializable {
+
+    private static final long serialVersionUID = 4782067096929198967L;
+    private String jobId;
+}

+ 99 - 0
src/main/java/io/milvus/bulkwriter/response/v2/GetImportProgressV2Response.java

@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.response.v2;
+
+import io.milvus.bulkwriter.response.GetImportProgressResponse;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetImportProgressV2Response implements Serializable {
+
+    private static final long serialVersionUID = -2302203037749197132L;
+
+    private String jobId;
+
+    private String collectionName;
+
+    private String fileName;
+
+    private Integer fileSize;
+
+    private String state;
+
+    private Integer progress;
+
+    private String completeTime;
+
+    private String reason;
+
+    private Integer totalRows;
+
+    private List<DetailV2> details;
+
+    @Data
+    @Builder
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class DetailV2 {
+        private String fileName;
+        private Integer fileSize;
+        private String state;
+        private Integer progress;
+        private String completeTime;
+        private String reason;
+
+
+        public GetImportProgressResponse.Detail toDetail() {
+            GetImportProgressResponse.Detail detail = new GetImportProgressResponse.Detail();
+            detail.setFileName(fileName);
+            detail.setFileSize(fileSize);
+            detail.setReadyPercentage(progress == null ? null : Double.valueOf(progress));
+            detail.setErrorMessage(reason);
+            detail.setCompleteTime(completeTime);
+            return detail;
+        }
+    }
+
+    public GetImportProgressResponse toGetImportProgressResponse() {
+        GetImportProgressResponse response = new GetImportProgressResponse();
+        response.setJobId(jobId);
+        response.setCollectionName(collectionName);
+        response.setFileName(fileName);
+        response.setFileSize(fileSize);
+        response.setReadyPercentage(progress == null ? null : Double.valueOf(progress));
+        response.setCompleteTime(completeTime);
+        response.setErrorMessage(reason);
+
+        List<GetImportProgressResponse.Detail> details = this.details.stream().map(DetailV2::toDetail).collect(Collectors.toList());
+        response.setDetails(details);
+        return response;
+    }
+
+}

+ 54 - 0
src/main/java/io/milvus/bulkwriter/response/v2/ListImportJobsV2Response.java

@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.response.v2;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class ListImportJobsV2Response implements Serializable {
+
+    private static final long serialVersionUID = -8400893490624599225L;
+    private Integer count;
+
+    private Integer currentPage;
+
+    private Integer pageSize;
+
+    private List<Record> records;
+
+    @Data
+    @Builder
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class Record {
+        private String collectionName;
+        private String jobId;
+        private String state;
+    }
+}