|
@@ -27,7 +27,7 @@ import com.google.gson.JsonElement;
|
|
|
import com.google.gson.JsonObject;
|
|
|
import com.google.gson.reflect.TypeToken;
|
|
|
import io.milvus.bulkwriter.BulkWriter;
|
|
|
-import io.milvus.bulkwriter.CloudImportV2;
|
|
|
+import io.milvus.bulkwriter.CloudImport;
|
|
|
import io.milvus.bulkwriter.LocalBulkWriter;
|
|
|
import io.milvus.bulkwriter.LocalBulkWriterParam;
|
|
|
import io.milvus.bulkwriter.RemoteBulkWriter;
|
|
@@ -40,12 +40,9 @@ import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
|
|
|
import io.milvus.bulkwriter.connect.AzureConnectParam;
|
|
|
import io.milvus.bulkwriter.connect.S3ConnectParam;
|
|
|
import io.milvus.bulkwriter.connect.StorageConnectParam;
|
|
|
-import io.milvus.bulkwriter.request.v2.BulkImportV2Request;
|
|
|
-import io.milvus.bulkwriter.request.v2.GetImportProgressV2Request;
|
|
|
-import io.milvus.bulkwriter.request.v2.ListImportJobsV2Request;
|
|
|
-import io.milvus.bulkwriter.response.v2.BulkImportV2Response;
|
|
|
-import io.milvus.bulkwriter.response.v2.GetImportProgressV2Response;
|
|
|
-import io.milvus.bulkwriter.response.v2.ListImportJobsV2Response;
|
|
|
+import io.milvus.bulkwriter.request.BulkImportRequest;
|
|
|
+import io.milvus.bulkwriter.request.GetImportProgressRequest;
|
|
|
+import io.milvus.bulkwriter.request.ListImportJobsRequest;
|
|
|
import io.milvus.client.MilvusClient;
|
|
|
import io.milvus.client.MilvusServiceClient;
|
|
|
import io.milvus.common.utils.ExceptionUtils;
|
|
@@ -81,7 +78,6 @@ import org.apache.http.util.Asserts;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.net.MalformedURLException;
|
|
|
import java.net.URL;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
@@ -561,7 +557,7 @@ public class BulkWriterExample {
|
|
|
System.out.println("Collection row number: " + getCollectionStatistics());
|
|
|
}
|
|
|
|
|
|
- private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException, MalformedURLException {
|
|
|
+ private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
|
|
|
System.out.println("\n===================== call cloudImport ====================");
|
|
|
|
|
|
String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
|
|
@@ -570,28 +566,34 @@ public class BulkWriterExample {
|
|
|
String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
|
|
|
String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;
|
|
|
|
|
|
- BulkImportV2Request bulkImportRequest = BulkImportV2Request.builder()
|
|
|
+ BulkImportRequest bulkImportRequest = BulkImportRequest.builder()
|
|
|
.objectUrl(objectUrl).accessKey(accessKey).secretKey(secretKey)
|
|
|
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
|
|
|
.build();
|
|
|
- BulkImportV2Response bulkImportResponse = CloudImportV2.createImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, bulkImportRequest);
|
|
|
- String jobId = bulkImportResponse.getJobId();
|
|
|
+ String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, bulkImportRequest);
|
|
|
+ JsonObject bulkImportObject = convertDataMap(bulkImportResult);
|
|
|
+
|
|
|
+ String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
|
|
|
System.out.println("Create a cloudImport job, job id: " + jobId);
|
|
|
|
|
|
while (true) {
|
|
|
System.out.println("Wait 5 second to check bulkInsert job state...");
|
|
|
TimeUnit.SECONDS.sleep(5);
|
|
|
|
|
|
- GetImportProgressV2Request request = GetImportProgressV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
|
|
|
- GetImportProgressV2Response getImportProgressResponse = CloudImportV2.getImportJobProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
|
|
|
- if ("Completed".equals(getImportProgressResponse.getState())) {
|
|
|
+ GetImportProgressRequest request = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
|
|
|
+ String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
|
|
|
+ JsonObject getImportProgressObject = convertDataMap(getImportProgressResult);
|
|
|
+ String importProgressState = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
|
|
|
+ String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
|
|
|
+ String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
|
|
|
+ if ("Completed".equals(importProgressState)) {
|
|
|
System.out.printf("The job %s completed%n", jobId);
|
|
|
break;
|
|
|
- } else if (StringUtils.isNotEmpty(getImportProgressResponse.getReason())) {
|
|
|
- System.out.printf("The job %s failed or canceled, reason: %s%n", jobId, getImportProgressResponse.getReason());
|
|
|
+ } else if (StringUtils.isNotEmpty(reason)) {
|
|
|
+ System.out.printf("The job %s failed or canceled, reason: %s%n", jobId, reason);
|
|
|
break;
|
|
|
} else {
|
|
|
- System.out.printf("The job %s is running, progress:%s%n", jobId, getImportProgressResponse.getProgress());
|
|
|
+ System.out.printf("The job %s is running, progress:%s%n", jobId, progress);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -768,25 +770,27 @@ public class BulkWriterExample {
|
|
|
return wrapper.getRowCount();
|
|
|
}
|
|
|
|
|
|
- private static void exampleCloudImport() throws MalformedURLException {
|
|
|
+ private static void exampleCloudImport() {
|
|
|
System.out.println("\n===================== import files to cloud vectordb ====================");
|
|
|
- BulkImportV2Request request = BulkImportV2Request.builder()
|
|
|
+ BulkImportRequest request = BulkImportRequest.builder()
|
|
|
.objectUrl(CloudImportConsts.OBJECT_URL).accessKey(CloudImportConsts.OBJECT_ACCESS_KEY).secretKey(CloudImportConsts.OBJECT_SECRET_KEY)
|
|
|
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
|
|
|
.build();
|
|
|
- BulkImportV2Response bulkImportResponse = CloudImportV2.createImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
|
|
|
- System.out.println(GSON_INSTANCE.toJson(bulkImportResponse));
|
|
|
+ String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
|
|
|
+ System.out.println(bulkImportResult);
|
|
|
|
|
|
System.out.println("\n===================== get import job progress ====================");
|
|
|
- String jobId = bulkImportResponse.getJobId();
|
|
|
- GetImportProgressV2Request getImportProgressRequest = GetImportProgressV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
|
|
|
- GetImportProgressV2Response getImportProgressResponse = CloudImportV2.getImportJobProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, getImportProgressRequest);
|
|
|
- System.out.println(GSON_INSTANCE.toJson(getImportProgressResponse));
|
|
|
+
|
|
|
+ JsonObject bulkImportObject = convertDataMap(bulkImportResult);
|
|
|
+ String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
|
|
|
+ GetImportProgressRequest getImportProgressRequest = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
|
|
|
+ String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, getImportProgressRequest);
|
|
|
+ System.out.println(getImportProgressResult);
|
|
|
|
|
|
System.out.println("\n===================== list import jobs ====================");
|
|
|
- ListImportJobsV2Request listImportJobsRequest = ListImportJobsV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).build();
|
|
|
- ListImportJobsV2Response listImportJobsResponse = CloudImportV2.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, listImportJobsRequest);
|
|
|
- System.out.println(GSON_INSTANCE.toJson(listImportJobsResponse));
|
|
|
+ ListImportJobsRequest listImportJobsRequest = ListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).build();
|
|
|
+ String listImportJobsResult = CloudImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, listImportJobsRequest);
|
|
|
+ System.out.println(listImportJobsResult);
|
|
|
}
|
|
|
|
|
|
private CollectionSchemaParam buildSimpleSchema() {
|
|
@@ -968,4 +972,8 @@ public class BulkWriterExample {
|
|
|
throw new RuntimeException(msg);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static JsonObject convertDataMap(String result) {
|
|
|
+ return GSON_INSTANCE.fromJson(result, JsonObject.class);
|
|
|
+ }
|
|
|
}
|