Browse Source

Support Milvus import-related APIs (#1110)

Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
xushuang.hu 8 months ago
parent
commit
46d1512e81

+ 75 - 85
examples/main/java/io/milvus/v1/BulkWriterExample.java

@@ -26,8 +26,8 @@ import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
+import io.milvus.bulkwriter.BulkImport;
 import io.milvus.bulkwriter.BulkWriter;
-import io.milvus.bulkwriter.CloudImport;
 import io.milvus.bulkwriter.LocalBulkWriter;
 import io.milvus.bulkwriter.LocalBulkWriterParam;
 import io.milvus.bulkwriter.RemoteBulkWriter;
@@ -40,26 +40,23 @@ import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
 import io.milvus.bulkwriter.connect.AzureConnectParam;
 import io.milvus.bulkwriter.connect.S3ConnectParam;
 import io.milvus.bulkwriter.connect.StorageConnectParam;
-import io.milvus.bulkwriter.request.BulkImportRequest;
-import io.milvus.bulkwriter.request.GetImportProgressRequest;
-import io.milvus.bulkwriter.request.ListImportJobsRequest;
+import io.milvus.bulkwriter.request.describe.CloudDescribeImportRequest;
+import io.milvus.bulkwriter.request.describe.MilvusDescribeImportRequest;
+import io.milvus.bulkwriter.request.import_.CloudImportRequest;
+import io.milvus.bulkwriter.request.import_.MilvusImportRequest;
+import io.milvus.bulkwriter.request.list.CloudListImportJobsRequest;
+import io.milvus.bulkwriter.request.list.MilvusListImportJobsRequest;
 import io.milvus.client.MilvusClient;
 import io.milvus.client.MilvusServiceClient;
 import io.milvus.common.utils.ExceptionUtils;
 import io.milvus.grpc.DataType;
 import io.milvus.grpc.GetCollectionStatisticsResponse;
-import io.milvus.grpc.GetImportStateResponse;
-import io.milvus.grpc.ImportResponse;
-import io.milvus.grpc.ImportState;
-import io.milvus.grpc.KeyValuePair;
 import io.milvus.grpc.QueryResults;
 import io.milvus.param.ConnectParam;
 import io.milvus.param.IndexType;
 import io.milvus.param.MetricType;
 import io.milvus.param.R;
 import io.milvus.param.RpcStatus;
-import io.milvus.param.bulkinsert.BulkInsertParam;
-import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
 import io.milvus.param.collection.CollectionSchemaParam;
 import io.milvus.param.collection.CreateCollectionParam;
 import io.milvus.param.collection.DropCollectionParam;
@@ -73,7 +70,6 @@ import io.milvus.param.index.CreateIndexParam;
 import io.milvus.response.GetCollStatResponseWrapper;
 import io.milvus.response.QueryResultsWrapper;
 import org.apache.avro.generic.GenericData;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.http.util.Asserts;
 
 import java.io.File;
@@ -83,7 +79,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 
@@ -517,83 +512,97 @@ public class BulkWriterExample {
     }
 
     private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
-        System.out.println("\n===================== call bulkInsert ====================");
         createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);
 
-        List<Long> taskIds = new ArrayList<>();
-        for (List<String> batch : batchFiles) {
-            Long taskId = bulkInsert(batch);
-            taskIds.add(taskId);
-            System.out.println("Create a bulkInert task, task id: " + taskId);
-        }
+        String url = String.format("http://%s:%s", HOST, PORT);
+        System.out.println("\n===================== import files to milvus ====================");
+        MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
+                .files(batchFiles)
+                .build();
+        String bulkImportResult = BulkImport.bulkImport(url, milvusImportRequest);
+        System.out.println(bulkImportResult);
 
-        while (!taskIds.isEmpty()) {
-            Iterator<Long> iterator = taskIds.iterator();
-            List<Long> tempTaskIds = new ArrayList<>();
-            while (iterator.hasNext()) {
-                Long taskId = iterator.next();
-                System.out.println("Wait 5 second to check bulkInsert tasks state...");
-                TimeUnit.SECONDS.sleep(5);
-
-                GetImportStateResponse bulkInsertState = getBulkInsertState(taskId);
-                if (bulkInsertState.getState() == ImportState.ImportFailed
-                        || bulkInsertState.getState() == ImportState.ImportFailedAndCleaned) {
-                    List<KeyValuePair> infosList = bulkInsertState.getInfosList();
-                    Optional<String> failedReasonOptional = infosList.stream().filter(e -> e.getKey().equals("failed_reason"))
-                            .map(KeyValuePair::getValue).findFirst();
-                    String failedReson = failedReasonOptional.orElse("");
-
-                    System.out.printf("The task %s failed, reason: %s%n", taskId, failedReson);
-                } else if (bulkInsertState.getState() == ImportState.ImportCompleted) {
-                    System.out.printf("The task %s completed%n", taskId);
-                } else {
-                    System.out.printf("The task %s is running, state:%s%n", taskId, bulkInsertState.getState());
-                    tempTaskIds.add(taskId);
-                }
+        JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
+        String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
+        System.out.println("Create a bulkInert task, job id: " + jobId);
+
+        System.out.println("\n===================== listBulkInsertJobs() ====================");
+        MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName(ALL_TYPES_COLLECTION_NAME).build();
+        String listImportJobsResult = BulkImport.listImportJobs(url, listImportJobsRequest);
+        System.out.println(listImportJobsResult);
+        while (true) {
+            System.out.println("Wait 5 second to check bulkInsert job state...");
+            TimeUnit.SECONDS.sleep(5);
+
+            System.out.println("\n===================== getBulkInsertState() ====================");
+            MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
+                    .jobId(jobId)
+                    .build();
+            String getImportProgressResult = BulkImport.getImportProgress(url, request);
+            System.out.println(getImportProgressResult);
+
+            JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
+            String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
+            String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
+            if ("Failed".equals(state)) {
+                String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
+                System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
+                break;
+            } else if ("Completed".equals(state)) {
+                System.out.printf("The job %s completed%n", jobId);
+                break;
+            } else {
+                System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
             }
-            taskIds = tempTaskIds;
         }
 
         System.out.println("Collection row number: " + getCollectionStatistics());
     }
 
     private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
-        System.out.println("\n===================== call cloudImport ====================");
-
         String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
                 ? StorageConsts.cloudStorage.getAzureObjectUrl(StorageConsts.AZURE_ACCOUNT_NAME, StorageConsts.AZURE_CONTAINER_NAME, ImportUtils.getCommonPrefix(batchFiles))
                 : StorageConsts.cloudStorage.getS3ObjectUrl(StorageConsts.STORAGE_BUCKET, ImportUtils.getCommonPrefix(batchFiles), StorageConsts.STORAGE_REGION);
         String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
         String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;
 
-        BulkImportRequest bulkImportRequest = BulkImportRequest.builder()
+        System.out.println("\n===================== call cloudImport ====================");
+        CloudImportRequest bulkImportRequest = CloudImportRequest.builder()
                 .objectUrl(objectUrl).accessKey(accessKey).secretKey(secretKey)
                 .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
+                .apiKey(CloudImportConsts.API_KEY)
                 .build();
-        String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, bulkImportRequest);
-        JsonObject bulkImportObject = convertDataMap(bulkImportResult);
+        String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, bulkImportRequest);
+        JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
 
         String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
         System.out.println("Create a cloudImport job, job id: " + jobId);
 
+        System.out.println("\n===================== call cloudListImportJobs ====================");
+        CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
+        String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
+        System.out.println(listImportJobsResult);
         while (true) {
             System.out.println("Wait 5 second to check bulkInsert job state...");
             TimeUnit.SECONDS.sleep(5);
 
-            GetImportProgressRequest request = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
-            String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
-            JsonObject getImportProgressObject = convertDataMap(getImportProgressResult);
+            System.out.println("\n===================== call cloudGetProgress ====================");
+            CloudDescribeImportRequest request = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
+            String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, request);
+            JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
             String importProgressState = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
-            String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
             String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
-            if ("Completed".equals(importProgressState)) {
-                System.out.printf("The job %s completed%n", jobId);
+
+            if ("Failed".equals(importProgressState)) {
+                String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
+                System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
                 break;
-            } else if (StringUtils.isNotEmpty(reason)) {
-                System.out.printf("The job %s failed or canceled, reason: %s%n", jobId, reason);
+            } else if ("Completed".equals(importProgressState)) {
+                System.out.printf("The job %s completed%n", jobId);
                 break;
             } else {
-                System.out.printf("The job %s is running, progress:%s%n", jobId, progress);
+                System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, importProgressState, progress);
             }
         }
 
@@ -735,26 +744,6 @@ public class BulkWriterExample {
         return wrapper.getRowRecords();
     }
 
-    private Long bulkInsert(List<String> batchFiles) {
-        System.out.println("========== bulkInsert() ==========");
-        checkMilvusClientIfExist();
-        R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
-                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
-                .withFiles(batchFiles)
-                .build());
-        ExceptionUtils.handleResponseStatus(response);
-        return response.getData().getTasksList().get(0);
-    }
-
-    private GetImportStateResponse getBulkInsertState(Long taskId) {
-        System.out.println("========== getBulkInsertState() ==========");
-        checkMilvusClientIfExist();
-        R<GetImportStateResponse> bulkInsertState = milvusClient.getBulkInsertState(GetBulkInsertStateParam.newBuilder()
-                .withTask(taskId)
-                .build());
-        return bulkInsertState.getData();
-    }
-
     private Long getCollectionStatistics() {
         System.out.println("========== getCollectionStatistics() ==========");
         // call flush() to flush the insert buffer to storage,
@@ -772,24 +761,25 @@ public class BulkWriterExample {
 
     private static void exampleCloudImport() {
         System.out.println("\n===================== import files to cloud vectordb ====================");
-        BulkImportRequest request = BulkImportRequest.builder()
+        CloudImportRequest request = CloudImportRequest.builder()
                 .objectUrl(CloudImportConsts.OBJECT_URL).accessKey(CloudImportConsts.OBJECT_ACCESS_KEY).secretKey(CloudImportConsts.OBJECT_SECRET_KEY)
                 .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
+                .apiKey(CloudImportConsts.API_KEY)
                 .build();
-        String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
+        String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, request);
         System.out.println(bulkImportResult);
 
         System.out.println("\n===================== get import job progress ====================");
 
-        JsonObject bulkImportObject = convertDataMap(bulkImportResult);
+        JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
         String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
-        GetImportProgressRequest getImportProgressRequest = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
-        String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, getImportProgressRequest);
+        CloudDescribeImportRequest getImportProgressRequest = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
+        String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, getImportProgressRequest);
         System.out.println(getImportProgressResult);
 
         System.out.println("\n===================== list import jobs ====================");
-        ListImportJobsRequest listImportJobsRequest = ListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).build();
-        String listImportJobsResult = CloudImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, listImportJobsRequest);
+        CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
+        String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
         System.out.println(listImportJobsResult);
     }
 
@@ -973,7 +963,7 @@ public class BulkWriterExample {
         }
     }
 
-    private static JsonObject convertDataMap(String result) {
+    private static JsonObject convertJsonObject(String result) {
         return GSON_INSTANCE.fromJson(result, JsonObject.class);
     }
 }

+ 5 - 2
src/main/java/io/milvus/bulkwriter/BaseCloudImport.java → src/main/java/io/milvus/bulkwriter/BaseBulkImport.java

@@ -22,11 +22,12 @@ package io.milvus.bulkwriter;
 import io.milvus.bulkwriter.response.RestfulResponse;
 import io.milvus.common.utils.ExceptionUtils;
 import kong.unirest.Unirest;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 
-public class BaseCloudImport {
+public class BaseBulkImport {
     protected static String postRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
         try {
             kong.unirest.HttpResponse<String> response = Unirest.post(url)
@@ -70,7 +71,9 @@ public class BaseCloudImport {
         header.put("Content-Type", "application/json");
         header.put("Accept-Encodin", "gzip,deflate,sdch");
         header.put("Accept-Languag", "en-US,en;q=0.5");
-        header.put("Authorization", "Bearer " + apiKey);
+        if (StringUtils.isNotEmpty(apiKey)) {
+            header.put("Authorization", "Bearer " + apiKey);
+        }
 
         return header;
     }

+ 10 - 11
src/main/java/io/milvus/bulkwriter/CloudImport.java → src/main/java/io/milvus/bulkwriter/BulkImport.java

@@ -21,44 +21,43 @@ package io.milvus.bulkwriter;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import io.milvus.bulkwriter.request.BulkImportRequest;
-import io.milvus.bulkwriter.request.GetImportProgressRequest;
-import io.milvus.bulkwriter.request.ListImportJobsRequest;
+import io.milvus.bulkwriter.request.describe.BaseDescribeImportRequest;
+import io.milvus.bulkwriter.request.import_.BaseImportRequest;
+import io.milvus.bulkwriter.request.list.BaseListImportJobsRequest;
 import io.milvus.bulkwriter.response.RestfulResponse;
 
 import java.util.Map;
 
-public class CloudImport extends BaseCloudImport {
+public class BulkImport extends BaseBulkImport {
     private static final Gson GSON_INSTANCE = new Gson();
 
-    public static String bulkImport(String url, String apiKey, BulkImportRequest request) {
+    public static String bulkImport(String url, BaseImportRequest request) {
         String requestURL = url + "/v2/vectordb/jobs/import/create";
 
         Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
-        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        String body = postRequest(requestURL, request.getApiKey(), params, 60 * 1000);
         RestfulResponse<Object> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<Object>>(){}.getType());
         handleResponse(requestURL, response);
         return body;
     }
 
-    public static String getImportProgress(String url, String apiKey, GetImportProgressRequest request) {
+    public static String getImportProgress(String url, BaseDescribeImportRequest request) {
         String requestURL = url + "/v2/vectordb/jobs/import/describe";
 
         Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
-        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        String body = postRequest(requestURL, request.getApiKey(), params, 60 * 1000);
         RestfulResponse<Object> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<Object>>(){}.getType());
         handleResponse(requestURL, response);
         return body;
     }
 
-    public static String listImportJobs(String url, String apiKey, ListImportJobsRequest request) {
+    public static String listImportJobs(String url, BaseListImportJobsRequest request) {
         String requestURL = url + "/v2/vectordb/jobs/import/list";
 
         Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
-        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        String body = postRequest(requestURL, request.getApiKey(), params, 60 * 1000);
         RestfulResponse<Object> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<Object>>(){}.getType());
         handleResponse(requestURL, response);
         return body;
     }
-
 }

+ 39 - 0
src/main/java/io/milvus/bulkwriter/request/describe/BaseDescribeImportRequest.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.describe;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+import java.io.Serializable;
+
+@Data
+@SuperBuilder(toBuilder = true)
+@AllArgsConstructor
+@NoArgsConstructor
+public class BaseDescribeImportRequest implements Serializable {
+    private static final long serialVersionUID = -787626534606813089L;
+    /**
+     * If you are calling the cloud API, this parameter needs to be filled in; otherwise, you can ignore it.
+     */
+    private String apiKey;
+}

+ 5 - 7
src/main/java/io/milvus/bulkwriter/request/GetImportProgressRequest.java → src/main/java/io/milvus/bulkwriter/request/describe/CloudDescribeImportRequest.java

@@ -17,21 +17,19 @@
  * under the License.
  */
 
-package io.milvus.bulkwriter.request;
+package io.milvus.bulkwriter.request.describe;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-
-import java.io.Serializable;
+import lombok.experimental.SuperBuilder;
 
 @Data
-@Builder
+@SuperBuilder
 @AllArgsConstructor
 @NoArgsConstructor
-public class GetImportProgressRequest implements Serializable {
-    private static final long serialVersionUID = -787626534606813089L;
+public class CloudDescribeImportRequest extends BaseDescribeImportRequest {
+    private static final long serialVersionUID = -6479634844757426430L;
     private String clusterId;
     private String jobId;
 }

+ 34 - 0
src/main/java/io/milvus/bulkwriter/request/describe/MilvusDescribeImportRequest.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.describe;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@NoArgsConstructor
+public class MilvusDescribeImportRequest extends BaseDescribeImportRequest {
+    private static final long serialVersionUID = 6123645882882199210L;
+    private String jobId;
+}

+ 39 - 0
src/main/java/io/milvus/bulkwriter/request/import_/BaseImportRequest.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.import_;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+import java.io.Serializable;
+
+@Data
+@SuperBuilder(toBuilder = true)
+@AllArgsConstructor
+@NoArgsConstructor
+public class BaseImportRequest implements Serializable {
+    private static final long serialVersionUID = 8192049841043084620L;
+    /**
+     * If you are calling the cloud API, this parameter needs to be filled in; otherwise, you can ignore it.
+     */
+    private String apiKey;
+}

+ 5 - 7
src/main/java/io/milvus/bulkwriter/request/BulkImportRequest.java → src/main/java/io/milvus/bulkwriter/request/import_/CloudImportRequest.java

@@ -17,21 +17,19 @@
  * under the License.
  */
 
-package io.milvus.bulkwriter.request;
+package io.milvus.bulkwriter.request.import_;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-
-import java.io.Serializable;
+import lombok.experimental.SuperBuilder;
 
 @Data
-@Builder
+@SuperBuilder
 @AllArgsConstructor
 @NoArgsConstructor
-public class BulkImportRequest implements Serializable {
-    private static final long serialVersionUID = 8192049841043084620L;
+public class CloudImportRequest extends BaseImportRequest {
+    private static final long serialVersionUID = 6487348610099924813L;
     private String objectUrl;
     private String accessKey;
     private String secretKey;

+ 37 - 0
src/main/java/io/milvus/bulkwriter/request/import_/MilvusImportRequest.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.import_;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+import java.util.List;
+
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@NoArgsConstructor
+public class MilvusImportRequest extends BaseImportRequest {
+    private static final long serialVersionUID = -1958858397962018740L;
+    private String collectionName;
+    private List<List<String>> files;
+}

+ 40 - 0
src/main/java/io/milvus/bulkwriter/request/list/BaseListImportJobsRequest.java

@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.list;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+import java.io.Serializable;
+
+
+@Data
+@SuperBuilder(toBuilder = true)
+@AllArgsConstructor
+@NoArgsConstructor
+public class BaseListImportJobsRequest implements Serializable {
+    private static final long serialVersionUID = -1890380396466908530L;
+    /**
+     * If you are calling the cloud API, this parameter needs to be filled in; otherwise, you can ignore it.
+     */
+    private String apiKey;
+}

+ 7 - 10
src/main/java/io/milvus/bulkwriter/request/ListImportJobsRequest.java → src/main/java/io/milvus/bulkwriter/request/list/CloudListImportJobsRequest.java

@@ -17,23 +17,20 @@
  * under the License.
  */
 
-package io.milvus.bulkwriter.request;
+package io.milvus.bulkwriter.request.list;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-
-import java.io.Serializable;
+import lombok.experimental.SuperBuilder;
 
 @Data
-@Builder
+@SuperBuilder
 @AllArgsConstructor
 @NoArgsConstructor
-public class ListImportJobsRequest implements Serializable {
-    private static final long serialVersionUID = -1890380396466908530L;
-
+public class CloudListImportJobsRequest extends BaseListImportJobsRequest {
+    private static final long serialVersionUID = -3380786382584854649L;
     private String clusterId;
-    private int pageSize;
-    private int currentPage;
+    private Integer pageSize;
+    private Integer currentPage;
 }

+ 34 - 0
src/main/java/io/milvus/bulkwriter/request/list/MilvusListImportJobsRequest.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.request.list;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@NoArgsConstructor
+public class MilvusListImportJobsRequest extends BaseListImportJobsRequest {
+    private static final long serialVersionUID = 8957739122547766268L;
+    private String collectionName;
+}