Browse Source

Bulkwriter supports V2 schema (#1119)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 8 months ago
parent
commit
14b9065db1

+ 918 - 0
examples/main/java/io/milvus/v2/BulkWriterExample.java

@@ -0,0 +1,918 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.milvus.v2;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.reflect.TypeToken;
+import io.milvus.bulkwriter.BulkImport;
+import io.milvus.bulkwriter.BulkWriter;
+import io.milvus.bulkwriter.LocalBulkWriter;
+import io.milvus.bulkwriter.LocalBulkWriterParam;
+import io.milvus.bulkwriter.RemoteBulkWriter;
+import io.milvus.bulkwriter.RemoteBulkWriterParam;
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.bulkwriter.common.clientenum.CloudStorage;
+import io.milvus.bulkwriter.common.utils.GeneratorUtils;
+import io.milvus.bulkwriter.common.utils.ImportUtils;
+import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
+import io.milvus.bulkwriter.connect.AzureConnectParam;
+import io.milvus.bulkwriter.connect.S3ConnectParam;
+import io.milvus.bulkwriter.connect.StorageConnectParam;
+import io.milvus.bulkwriter.request.describe.CloudDescribeImportRequest;
+import io.milvus.bulkwriter.request.describe.MilvusDescribeImportRequest;
+import io.milvus.bulkwriter.request.import_.CloudImportRequest;
+import io.milvus.bulkwriter.request.import_.MilvusImportRequest;
+import io.milvus.bulkwriter.request.list.CloudListImportJobsRequest;
+import io.milvus.bulkwriter.request.list.MilvusListImportJobsRequest;
+import io.milvus.v1.CommonUtils;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.common.ConsistencyLevel;
+import io.milvus.v2.common.IndexParam;
+import io.milvus.v2.service.collection.request.*;
+import io.milvus.v2.service.index.request.CreateIndexReq;
+import io.milvus.v2.service.vector.request.QueryReq;
+import io.milvus.v2.service.vector.response.QueryResp;
+import org.apache.avro.generic.GenericData;
+import org.apache.http.util.Asserts;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+
+public class BulkWriterExample {
+    // milvus
+    public static final String HOST = "127.0.0.1";
+    public static final Integer PORT = 19530;
+    public static final String USER_NAME = "user.name";
+    public static final String PASSWORD = "password";
+
+    private static final Gson GSON_INSTANCE = new Gson();
+
+    private static final List<Integer> QUERY_IDS = Lists.newArrayList(100, 5000);
+
+
+    /**
+     * If you need to transfer the files generated by bulkWriter to the corresponding remote storage (AWS S3, GCP GCS, Azure Blob, Aliyun OSS, Tencent Cloud TOS),
+     * you need to configure it accordingly; Otherwise, you can ignore it.
+     */
+    public static class StorageConsts {
+        public static final CloudStorage cloudStorage = CloudStorage.MINIO;
+
+        /**
+         * If using remote storage such as AWS S3, GCP GCS, Aliyun OSS, Tencent Cloud TOS, Minio
+         * please configure the following parameters.
+         */
+        public static final String STORAGE_ENDPOINT = cloudStorage.getEndpoint("http://127.0.0.1:9000");
+        public static final String STORAGE_BUCKET = "a-bucket"; // default bucket name of MinIO/Milvus standalone
+        public static final String STORAGE_ACCESS_KEY = "minioadmin"; // default ak of MinIO/Milvus standalone
+        public static final String STORAGE_SECRET_KEY = "minioadmin"; // default sk of MinIO/Milvus standalone
+        /**
+         * if using remote storage, please configure the parameter
+         * if using local storage such as Local Minio, please set this parameter to empty.
+         */
+        public static final String STORAGE_REGION = "";
+
+        /**
+         * If using remote storage such as Azure Blob
+         * please configure the following parameters.
+         */
+        public static final String AZURE_CONTAINER_NAME = "azure.container.name";
+        public static final String AZURE_ACCOUNT_NAME = "azure.account.name";
+        public static final String AZURE_ACCOUNT_KEY = "azure.account.key";
+    }
+
+
+    /**
+     * If you have used remoteBulkWriter to generate remote data and want to import data using the Import interface on Zilliz Cloud after generation,
+     * you don't need to configure the following object-related parameters (OBJECT_URL, OBJECT_ACCESS_KEY, OBJECT_SECRET_KEY). You can call the callCloudImport method, as the internal logic has been encapsulated for you.
+     * <p>
+     * If you already have data stored in remote storage (not generated through remoteBulkWriter), and you want to invoke the Import interface on Zilliz Cloud to import data,
+     * you need to configure the following parameters and then follow the exampleCloudBulkInsert method.
+     * <p>
+     * If you do not need to import data through the Import interface on Zilliz Cloud, you can ignore this.
+     */
+    public static class CloudImportConsts {
+
+        /**
+         * The value of the URL is fixed.
+         * For overseas regions, it is: https://api.cloud.zilliz.com
+         * For regions in China, it is: https://api.cloud.zilliz.com.cn
+         */
+        public static final String CLOUD_ENDPOINT = "https://api.cloud.zilliz.com";
+        public static final String API_KEY = "_api_key_for_cluster_org_";
+        public static final String CLUSTER_ID = "_your_cloud_cluster_id_";
+        public static final String COLLECTION_NAME = "_collection_name_on_the_cluster_id_";
+        // If partition_name is not specified, use ""
+        public static final String PARTITION_NAME = "_partition_name_on_the_collection_";
+
+        /**
+         * Please provide the complete URL for the file or folder you want to import, similar to https://bucket-name.s3.region-code.amazonaws.com/object-name.
+         * For more details, you can refer to https://docs.zilliz.com/docs/import-data-on-web-ui.
+         */
+        public static final String OBJECT_URL = "_your_storage_object_url_";
+        public static final String OBJECT_ACCESS_KEY = "_your_storage_access_key_";
+        public static final String OBJECT_SECRET_KEY = "_your_storage_secret_key_";
+    }
+
+    private static final String SIMPLE_COLLECTION_NAME = "java_sdk_bulkwriter_simple_v2";
+    private static final String ALL_TYPES_COLLECTION_NAME = "java_sdk_bulkwriter_all_v2";
+    private static final Integer DIM = 512;
+    private static final Integer ARRAY_CAPACITY = 10;
+    private MilvusClientV2 milvusClient;
+
+    public static void main(String[] args) throws Exception {
+
+        BulkWriterExample exampleBulkWriter = new BulkWriterExample();
+        exampleBulkWriter.createConnection();
+
+        List<BulkFileType> fileTypes = Lists.newArrayList(
+                BulkFileType.PARQUET
+        );
+
+        exampleSimpleCollection(exampleBulkWriter, fileTypes);
+        exampleAllTypesCollectionRemote(exampleBulkWriter, fileTypes);
+
+        // to call cloud import api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
+        // exampleCloudImport();
+    }
+
+    private void createConnection() {
+        System.out.println("\nCreate connection...");
+        String url = String.format("http://%s:%s", HOST, PORT);
+        milvusClient = new MilvusClientV2(ConnectConfig.builder()
+                .uri(url)
+                .username(USER_NAME)
+                .password(PASSWORD)
+                .build());
+        System.out.println("\nConnected");
+    }
+
+    private static void exampleSimpleCollection(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
+        CreateCollectionReq.CollectionSchema collectionSchema = exampleBulkWriter.buildSimpleSchema();
+        exampleBulkWriter.createCollection(SIMPLE_COLLECTION_NAME, collectionSchema, false);
+
+        for (BulkFileType fileType : fileTypes) {
+            localWriter(collectionSchema, fileType);
+        }
+
+        for (BulkFileType fileType : fileTypes) {
+            remoteWriter(collectionSchema, fileType);
+        }
+
+        // parallel append
+        parallelAppend(collectionSchema);
+    }
+
+    private static void exampleAllTypesCollectionRemote(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
+        // 4 types vectors + all scalar types + dynamic field enabled, use bulkInsert interface
+        for (BulkFileType fileType : fileTypes) {
+            CreateCollectionReq.CollectionSchema collectionSchema = buildAllTypesSchema();
+            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(collectionSchema, fileType);
+            exampleBulkWriter.callBulkInsert(collectionSchema, batchFiles);
+            exampleBulkWriter.retrieveImportData();
+        }
+
+//        // 4 types vectors + all scalar types + dynamic field enabled, use cloud import api.
+//        // You need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
+//        for (BulkFileType fileType : fileTypes) {
+//            CollectionSchemaParam collectionSchema = buildAllTypesSchema();
+//            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(collectionSchema, fileType);
+//            exampleBulkWriter.createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);
+//            exampleBulkWriter.callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME, StringUtils.EMPTY);
+//            exampleBulkWriter.retrieveImportData();
+//        }
+    }
+
+    private static void localWriter(CreateCollectionReq.CollectionSchema collectionSchema, BulkFileType fileType) throws Exception {
+        System.out.printf("\n===================== local writer (%s) ====================%n", fileType.name());
+        LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
+                .withCollectionSchema(collectionSchema)
+                .withLocalPath("/tmp/bulk_writer")
+                .withFileType(fileType)
+                .withChunkSize(128 * 1024 * 1024)
+                .build();
+
+        try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
+            // read data from csv
+            readCsvSampleData("data/train_embeddings.csv", localBulkWriter);
+
+            // append rows
+            for (int i = 0; i < 100000; i++) {
+                JsonObject row = new JsonObject();
+                row.addProperty("path", "path_" + i);
+                row.add("vector", GSON_INSTANCE.toJsonTree(GeneratorUtils.genFloatVector(DIM)));
+                row.addProperty("label", "label_" + i);
+
+                localBulkWriter.appendRow(row);
+            }
+
+            System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
+            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
+
+            localBulkWriter.commit(false);
+            List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
+            System.out.printf("Local writer done! output local files: %s%n", batchFiles);
+        } catch (Exception e) {
+            System.out.println("Local writer catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static void remoteWriter(CreateCollectionReq.CollectionSchema collectionSchema, BulkFileType fileType) throws Exception {
+        System.out.printf("\n===================== remote writer (%s) ====================%n", fileType.name());
+
+        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
+            // read data from csv
+            readCsvSampleData("data/train_embeddings.csv", remoteBulkWriter);
+
+            // append rows
+            for (int i = 0; i < 100000; i++) {
+                JsonObject row = new JsonObject();
+                row.addProperty("path", "path_" + i);
+                row.add("vector", GSON_INSTANCE.toJsonTree(GeneratorUtils.genFloatVector(DIM)));
+                row.addProperty("label", "label_" + i);
+
+                remoteBulkWriter.appendRow(row);
+            }
+
+            System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
+            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
+
+            remoteBulkWriter.commit(false);
+            List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
+
+            System.out.printf("Remote writer done! output remote files: %s%n", batchFiles);
+        } catch (Exception e) {
+            System.out.println("Remote writer catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static void parallelAppend(CreateCollectionReq.CollectionSchema collectionSchema) throws Exception {
+        System.out.print("\n===================== parallel append ====================");
+        LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
+                .withCollectionSchema(collectionSchema)
+                .withLocalPath("/tmp/bulk_writer")
+                .withFileType(BulkFileType.PARQUET)
+                .withChunkSize(128 * 1024 * 1024)  // 128MB
+                .build();
+
+        try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
+            List<Thread> threads = new ArrayList<>();
+            int threadCount = 10;
+            int rowsPerThread = 1000;
+            for (int i = 0; i < threadCount; ++i) {
+                int current = i;
+                Thread thread = new Thread(() -> appendRow(localBulkWriter, current * rowsPerThread, (current + 1) * rowsPerThread));
+                threads.add(thread);
+                thread.start();
+                System.out.printf("Thread %s started%n", thread.getName());
+            }
+
+            for (Thread thread : threads) {
+                thread.join();
+                System.out.printf("Thread %s finished%n", thread.getName());
+            }
+
+            System.out.println(localBulkWriter.getTotalRowCount() + " rows appends");
+            System.out.println(localBulkWriter.getBufferRowCount() + " rows in buffer not flushed");
+            localBulkWriter.commit(false);
+            System.out.printf("Append finished, %s rows%n", threadCount * rowsPerThread);
+
+            long rowCount = 0;
+            List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
+            for (List<String> batch : batchFiles) {
+                for (String filePath : batch) {
+                    rowCount += readParquet(filePath);
+                }
+            }
+
+            Asserts.check(rowCount == threadCount * rowsPerThread, String.format("rowCount %s not equals expected %s", rowCount, threadCount * rowsPerThread));
+            System.out.println("Data is correct");
+        } catch (Exception e) {
+            System.out.println("parallelAppend catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static long readParquet(String localFilePath) throws Exception {
+        final long[] rowCount = {0};
+        new ParquetReaderUtils() {
+            @Override
+            public void readRecord(GenericData.Record record) {
+                rowCount[0]++;
+                String pathValue = record.get("path").toString();
+                String labelValue = record.get("label").toString();
+                Asserts.check(pathValue.replace("path_", "").equals(labelValue.replace("label_", "")), String.format("the suffix of %s not equals the suffix of %s", pathValue, labelValue));
+            }
+        }.readParquet(localFilePath);
+        System.out.printf("The file %s contains %s rows. Verify the content...%n", localFilePath, rowCount[0]);
+        return rowCount[0];
+    }
+
+    private static void appendRow(LocalBulkWriter writer, int begin, int end) {
+        try {
+            for (int i = begin; i < end; ++i) {
+                JsonObject row = new JsonObject();
+                row.addProperty("path", "path_" + i);
+                row.add("vector", GSON_INSTANCE.toJsonTree(GeneratorUtils.genFloatVector(DIM)));
+                row.addProperty("label", "label_" + i);
+
+                writer.appendRow(row);
+                if (i % 100 == 0) {
+                    System.out.printf("%s inserted %s items%n", Thread.currentThread().getName(), i - begin);
+                }
+            }
+        } catch (Exception e) {
+            System.out.println("failed to append row!");
+        }
+    }
+
+    private List<List<String>> allTypesRemoteWriter(CreateCollectionReq.CollectionSchema collectionSchema, BulkFileType fileType) throws Exception {
+        System.out.printf("\n===================== all field types (%s) ====================%n", fileType.name());
+
+        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
+            System.out.println("Append rows");
+            int batchCount = 10000;
+
+            for (int i = 0; i < batchCount; ++i) {
+                JsonObject rowObject = new JsonObject();
+
+                // scalar field
+                rowObject.addProperty("id", i);
+                rowObject.addProperty("bool", i % 5 == 0);
+                rowObject.addProperty("int8", i % 128);
+                rowObject.addProperty("int16", i % 1000);
+                rowObject.addProperty("int32", i % 100000);
+                rowObject.addProperty("float", i / 3);
+                rowObject.addProperty("double", i / 7);
+                rowObject.addProperty("varchar", "varchar_" + i);
+                rowObject.addProperty("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
+
+                // vector field
+                rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateFloatVector(DIM)));
+                rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateBinaryVector(DIM).array()));
+                rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateFloat16Vector(DIM, false).array()));
+                rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateSparseVector()));
+
+                // array field
+                rowObject.add("array_bool", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorBoolValue(10)));
+                rowObject.add("array_int8", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt8Value(10)));
+                rowObject.add("array_int16", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt16Value(10)));
+                rowObject.add("array_int32", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt32Value(10)));
+                rowObject.add("array_int64", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorLongValue(10)));
+                rowObject.add("array_varchar", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorVarcharValue(10, 10)));
+                rowObject.add("array_float", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorFloatValue(10)));
+                rowObject.add("array_double", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorDoubleValue(10)));
+
+                // dynamic fields
+                if (collectionSchema.isEnableDynamicField()) {
+                    rowObject.addProperty("dynamic", "dynamic_" + i);
+                }
+
+                if (QUERY_IDS.contains(i)) {
+                    System.out.println(rowObject);
+                }
+
+                remoteBulkWriter.appendRow(rowObject);
+            }
+            System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
+            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
+            System.out.println("Generate data files...");
+            remoteBulkWriter.commit(false);
+
+            System.out.printf("Data files have been uploaded: %s%n", remoteBulkWriter.getBatchFiles());
+            return remoteBulkWriter.getBatchFiles();
+        } catch (Exception e) {
+            System.out.println("allTypesRemoteWriter catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static RemoteBulkWriter buildRemoteBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema, BulkFileType fileType) throws IOException {
+        StorageConnectParam connectParam = buildStorageConnectParam();
+        RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
+                .withCollectionSchema(collectionSchema)
+                .withRemotePath("bulk_data")
+                .withFileType(fileType)
+                .withChunkSize(512 * 1024 * 1024)
+                .withConnectParam(connectParam)
+                .build();
+        return new RemoteBulkWriter(bulkWriterParam);
+    }
+
+    private static StorageConnectParam buildStorageConnectParam() {
+        StorageConnectParam connectParam;
+        if (StorageConsts.cloudStorage == CloudStorage.AZURE) {
+            String connectionStr = "DefaultEndpointsProtocol=https;AccountName=" + StorageConsts.AZURE_ACCOUNT_NAME +
+                    ";AccountKey=" + StorageConsts.AZURE_ACCOUNT_KEY + ";EndpointSuffix=core.windows.net";
+            connectParam = AzureConnectParam.newBuilder()
+                    .withConnStr(connectionStr)
+                    .withContainerName(StorageConsts.AZURE_CONTAINER_NAME)
+                    .build();
+        } else {
+            connectParam = S3ConnectParam.newBuilder()
+                    .withEndpoint(StorageConsts.STORAGE_ENDPOINT)
+                    .withCloudName(StorageConsts.cloudStorage.getCloudName())
+                    .withBucketName(StorageConsts.STORAGE_BUCKET)
+                    .withAccessKey(StorageConsts.STORAGE_ACCESS_KEY)
+                    .withSecretKey(StorageConsts.STORAGE_SECRET_KEY)
+                    .withRegion(StorageConsts.STORAGE_REGION)
+                    .build();
+        }
+        return connectParam;
+    }
+
+    private static void readCsvSampleData(String filePath, BulkWriter writer) throws IOException, InterruptedException {
+        ClassLoader classLoader = BulkWriterExample.class.getClassLoader();
+        URL resourceUrl = classLoader.getResource(filePath);
+        filePath = new File(resourceUrl.getFile()).getAbsolutePath();
+
+        CsvMapper csvMapper = new CsvMapper();
+
+        File csvFile = new File(filePath);
+        CsvSchema csvSchema = CsvSchema.builder().setUseHeader(true).build();
+        Iterator<CsvDataObject> iterator = csvMapper.readerFor(CsvDataObject.class).with(csvSchema).readValues(csvFile);
+        while (iterator.hasNext()) {
+            CsvDataObject dataObject = iterator.next();
+            JsonObject row = new JsonObject();
+
+            row.add("vector", GSON_INSTANCE.toJsonTree(dataObject.toFloatArray()));
+            row.addProperty("label", dataObject.getLabel());
+            row.addProperty("path", dataObject.getPath());
+
+            writer.appendRow(row);
+        }
+    }
+
+    private static class CsvDataObject {
+        @JsonProperty
+        private String vector;
+        @JsonProperty
+        private String path;
+        @JsonProperty
+        private String label;
+
+        public String getVector() {
+            return vector;
+        }
+        public String getPath() {
+            return path;
+        }
+        public String getLabel() {
+            return label;
+        }
+        public List<Float> toFloatArray() {
+            return GSON_INSTANCE.fromJson(vector, new TypeToken<List<Float>>() {
+            }.getType());
+        }
+    }
+
+    private void callBulkInsert(CreateCollectionReq.CollectionSchema collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
+        createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);
+
+        String url = String.format("http://%s:%s", HOST, PORT);
+        System.out.println("\n===================== import files to milvus ====================");
+        MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
+                .files(batchFiles)
+                .build();
+        String bulkImportResult = BulkImport.bulkImport(url, milvusImportRequest);
+        System.out.println(bulkImportResult);
+
+        JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
+        String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
+        System.out.println("Create a bulkInert task, job id: " + jobId);
+
+        System.out.println("\n===================== listBulkInsertJobs() ====================");
+        MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName(ALL_TYPES_COLLECTION_NAME).build();
+        String listImportJobsResult = BulkImport.listImportJobs(url, listImportJobsRequest);
+        System.out.println(listImportJobsResult);
+        while (true) {
+            System.out.println("Wait 5 second to check bulkInsert job state...");
+            TimeUnit.SECONDS.sleep(5);
+
+            System.out.println("\n===================== getBulkInsertState() ====================");
+            MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
+                    .jobId(jobId)
+                    .build();
+            String getImportProgressResult = BulkImport.getImportProgress(url, request);
+            System.out.println(getImportProgressResult);
+
+            JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
+            String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
+            String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
+            if ("Failed".equals(state)) {
+                String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
+                System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
+                break;
+            } else if ("Completed".equals(state)) {
+                System.out.printf("The job %s completed%n", jobId);
+                break;
+            } else {
+                System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
+            }
+        }
+
+        System.out.println("Collection row number: " + getCollectionStatistics());
+    }
+
+    private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
+        String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
+                ? StorageConsts.cloudStorage.getAzureObjectUrl(StorageConsts.AZURE_ACCOUNT_NAME, StorageConsts.AZURE_CONTAINER_NAME, ImportUtils.getCommonPrefix(batchFiles))
+                : StorageConsts.cloudStorage.getS3ObjectUrl(StorageConsts.STORAGE_BUCKET, ImportUtils.getCommonPrefix(batchFiles), StorageConsts.STORAGE_REGION);
+        String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
+        String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;
+
+        System.out.println("\n===================== call cloudImport ====================");
+        CloudImportRequest bulkImportRequest = CloudImportRequest.builder()
+                .objectUrl(objectUrl).accessKey(accessKey).secretKey(secretKey)
+                .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
+                .apiKey(CloudImportConsts.API_KEY)
+                .build();
+        String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, bulkImportRequest);
+        JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
+
+        String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
+        System.out.println("Create a cloudImport job, job id: " + jobId);
+
+        System.out.println("\n===================== call cloudListImportJobs ====================");
+        CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
+        String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
+        System.out.println(listImportJobsResult);
+        while (true) {
+            System.out.println("Wait 5 second to check bulkInsert job state...");
+            TimeUnit.SECONDS.sleep(5);
+
+            System.out.println("\n===================== call cloudGetProgress ====================");
+            CloudDescribeImportRequest request = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
+            String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, request);
+            JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
+            String importProgressState = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
+            String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
+
+            if ("Failed".equals(importProgressState)) {
+                String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
+                System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
+                break;
+            } else if ("Completed".equals(importProgressState)) {
+                System.out.printf("The job %s completed%n", jobId);
+                break;
+            } else {
+                System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, importProgressState, progress);
+            }
+        }
+
+        System.out.println("Collection row number: " + getCollectionStatistics());
+    }
+
+    /**
+     * @param collectionSchema collection info
+     * @param dropIfExist     if collection already exist, will drop firstly and then create again
+     */
+    private void createCollection(String collectionName, CreateCollectionReq.CollectionSchema collectionSchema, boolean dropIfExist) {
+        System.out.println("\n===================== create collection ====================");
+        checkMilvusClientIfExist();
+
+        CreateCollectionReq requestCreate = CreateCollectionReq.builder()
+                .collectionName(collectionName)
+                .collectionSchema(collectionSchema)
+                .consistencyLevel(ConsistencyLevel.BOUNDED)
+                .build();
+
+        Boolean has = milvusClient.hasCollection(HasCollectionReq.builder().collectionName(collectionName).build());
+        if (has) {
+            if (dropIfExist) {
+                milvusClient.dropCollection(DropCollectionReq.builder().collectionName(collectionName).build());
+                milvusClient.createCollection(requestCreate);
+            }
+        } else {
+            milvusClient.createCollection(requestCreate);
+        }
+
+        System.out.printf("Collection %s created%n", collectionName);
+    }
+
+    private void retrieveImportData() {
+        createIndex();
+
+        System.out.printf("Load collection and query items %s%n", QUERY_IDS);
+        loadCollection();
+
+        String expr = String.format("id in %s", QUERY_IDS);
+        System.out.println(expr);
+
+        List<QueryResp.QueryResult> results = query(expr, Lists.newArrayList("*"));
+        System.out.println("Query results:");
+        for (QueryResp.QueryResult result : results) {
+            Map<String, Object> entity = result.getEntity();
+            JsonObject rowObject = new JsonObject();
+            // scalar field
+            rowObject.addProperty("id", (Long)entity.get("id"));
+            rowObject.addProperty("bool", (Boolean) entity.get("bool"));
+            rowObject.addProperty("int8", (Integer) entity.get("int8"));
+            rowObject.addProperty("int16", (Integer) entity.get("int16"));
+            rowObject.addProperty("int32", (Integer) entity.get("int32"));
+            rowObject.addProperty("float", (Float) entity.get("float"));
+            rowObject.addProperty("double", (Double) entity.get("double"));
+            rowObject.addProperty("varchar", (String) entity.get("varchar"));
+            rowObject.add("json", (JsonElement) entity.get("json"));
+
+            // vector field
+            rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(entity.get("float_vector")));
+            rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(((ByteBuffer)entity.get("binary_vector")).array()));
+            rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(((ByteBuffer)entity.get("float16_vector")).array()));
+            rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(entity.get("sparse_vector")));
+
+            // array field
+            rowObject.add("array_bool", GSON_INSTANCE.toJsonTree(entity.get("array_bool")));
+            rowObject.add("array_int8", GSON_INSTANCE.toJsonTree(entity.get("array_int8")));
+            rowObject.add("array_int16", GSON_INSTANCE.toJsonTree(entity.get("array_int16")));
+            rowObject.add("array_int32", GSON_INSTANCE.toJsonTree(entity.get("array_int32")));
+            rowObject.add("array_int64", GSON_INSTANCE.toJsonTree(entity.get("array_int64")));
+            rowObject.add("array_varchar", GSON_INSTANCE.toJsonTree(entity.get("array_varchar")));
+            rowObject.add("array_float", GSON_INSTANCE.toJsonTree(entity.get("array_float")));
+            rowObject.add("array_double", GSON_INSTANCE.toJsonTree(entity.get("array_double")));
+
+            // dynamic field
+            rowObject.addProperty("dynamic", (String) entity.get("dynamic"));
+
+            System.out.println(rowObject);
+        }
+    }
+
+    private void createIndex() {
+        System.out.println("Create index...");
+        checkMilvusClientIfExist();
+
+        List<IndexParam> indexes = new ArrayList<>();
+        indexes.add(IndexParam.builder()
+                .fieldName("float_vector")
+                .indexType(IndexParam.IndexType.FLAT)
+                .metricType(IndexParam.MetricType.L2)
+                .build());
+        indexes.add(IndexParam.builder()
+                .fieldName("binary_vector")
+                .indexType(IndexParam.IndexType.BIN_FLAT)
+                .metricType(IndexParam.MetricType.HAMMING)
+                .build());
+        indexes.add(IndexParam.builder()
+                .fieldName("float16_vector")
+                .indexType(IndexParam.IndexType.FLAT)
+                .metricType(IndexParam.MetricType.IP)
+                .build());
+        indexes.add(IndexParam.builder()
+                .fieldName("sparse_vector")
+                .indexType(IndexParam.IndexType.SPARSE_WAND)
+                .metricType(IndexParam.MetricType.IP)
+                .build());
+
+        milvusClient.createIndex(CreateIndexReq.builder()
+                .indexParams(indexes)
+                .build());
+    }
+
+    private void loadCollection() {
+        System.out.println("Loading Collection...");
+        checkMilvusClientIfExist();
+        milvusClient.loadCollection(LoadCollectionReq.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
+                .build());
+    }
+
+    private List<QueryResp.QueryResult> query(String expr, List<String> outputFields) {
+        System.out.println("========== query() ==========");
+        checkMilvusClientIfExist();
+        QueryReq test = QueryReq.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
+                .filter(expr)
+                .outputFields(outputFields)
+                .build();
+        QueryResp response = milvusClient.query(test);
+        return response.getQueryResults();
+    }
+
+    private Long getCollectionStatistics() {
+        System.out.println("========== getCollectionStatistics() ==========");
+        checkMilvusClientIfExist();
+
+        // Get row count, set ConsistencyLevel.STRONG to sync the data to query node so that data is visible
+        QueryResp countR = milvusClient.query(QueryReq.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
+                .filter("")
+                .outputFields(Collections.singletonList("count(*)"))
+                .consistencyLevel(ConsistencyLevel.STRONG)
+                .build());
+        return (long)countR.getQueryResults().get(0).getEntity().get("count(*)");
+    }
+
+    private static void exampleCloudImport() {
+        System.out.println("\n===================== import files to cloud vectordb ====================");
+        CloudImportRequest request = CloudImportRequest.builder()
+                .objectUrl(CloudImportConsts.OBJECT_URL).accessKey(CloudImportConsts.OBJECT_ACCESS_KEY).secretKey(CloudImportConsts.OBJECT_SECRET_KEY)
+                .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
+                .apiKey(CloudImportConsts.API_KEY)
+                .build();
+        String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, request);
+        System.out.println(bulkImportResult);
+
+        System.out.println("\n===================== get import job progress ====================");
+
+        JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
+        String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
+        CloudDescribeImportRequest getImportProgressRequest = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
+        String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, getImportProgressRequest);
+        System.out.println(getImportProgressResult);
+
+        System.out.println("\n===================== list import jobs ====================");
+        CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
+        String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
+        System.out.println(listImportJobsResult);
+    }
+
+    private CreateCollectionReq.CollectionSchema buildSimpleSchema() {
+        CreateCollectionReq.CollectionSchema schemaV2 = CreateCollectionReq.CollectionSchema.builder()
+                .build();
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("id")
+                .dataType(io.milvus.v2.common.DataType.Int64)
+                .isPrimaryKey(Boolean.TRUE)
+                .autoID(true)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("path")
+                .dataType(io.milvus.v2.common.DataType.VarChar)
+                .maxLength(512)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("label")
+                .dataType(io.milvus.v2.common.DataType.VarChar)
+                .maxLength(512)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("vector")
+                .dataType(io.milvus.v2.common.DataType.FloatVector)
+                .dimension(DIM)
+                .build());
+
+        return schemaV2;
+    }
+
+    private static CreateCollectionReq.CollectionSchema buildAllTypesSchema() {
+        CreateCollectionReq.CollectionSchema schemaV2 = CreateCollectionReq.CollectionSchema.builder()
+                .enableDynamicField(true)
+                .build();
+        // scalar field
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("id")
+                .dataType(io.milvus.v2.common.DataType.Int64)
+                .isPrimaryKey(Boolean.TRUE)
+                .autoID(false)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("bool")
+                .dataType(io.milvus.v2.common.DataType.Bool)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("int8")
+                .dataType(io.milvus.v2.common.DataType.Int8)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("int16")
+                .dataType(io.milvus.v2.common.DataType.Int16)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("int32")
+                .dataType(io.milvus.v2.common.DataType.Int32)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("float")
+                .dataType(io.milvus.v2.common.DataType.Float)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("double")
+                .dataType(io.milvus.v2.common.DataType.Double)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("varchar")
+                .dataType(io.milvus.v2.common.DataType.VarChar)
+                .maxLength(512)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("json")
+                .dataType(io.milvus.v2.common.DataType.JSON)
+                .build());
+
+        // vector fields
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("float_vector")
+                .dataType(io.milvus.v2.common.DataType.FloatVector)
+                .dimension(DIM)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("binary_vector")
+                .dataType(io.milvus.v2.common.DataType.BinaryVector)
+                .dimension(DIM)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("float16_vector")
+                .dataType(io.milvus.v2.common.DataType.Float16Vector)
+                .dimension(DIM)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("sparse_vector")
+                .dataType(io.milvus.v2.common.DataType.SparseFloatVector)
+                .build());
+
+        // array fields
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_bool")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.Bool)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_int8")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.Int8)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_int16")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.Int16)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_int32")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.Int32)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_int64")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.Int64)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_varchar")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.VarChar)
+                .maxLength(512)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_float")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.Float)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("array_double")
+                .dataType(io.milvus.v2.common.DataType.Array)
+                .maxCapacity(ARRAY_CAPACITY)
+                .elementType(io.milvus.v2.common.DataType.Double)
+                .build());
+
+        return schemaV2;
+    }
+
+    private void checkMilvusClientIfExist() {
+        if (milvusClient == null) {
+            String msg = "milvusClient is null. Please initialize it by calling createConnection() first before use.";
+            throw new RuntimeException(msg);
+        }
+    }
+
+    private static JsonObject convertJsonObject(String result) {
+        return GSON_INSTANCE.fromJson(result, JsonObject.class);
+    }
+}

+ 15 - 2
src/main/java/io/milvus/bulkwriter/LocalBulkWriterParam.java

@@ -20,9 +20,11 @@
 package io.milvus.bulkwriter;
 
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.bulkwriter.common.utils.V2AdapterUtils;
 import io.milvus.exception.ParamException;
 import io.milvus.param.ParamUtils;
 import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.ToString;
@@ -62,9 +64,9 @@ public class LocalBulkWriterParam {
         }
 
         /**
-         * Sets the collection info.
+         * Sets the collection schema.
          *
-         * @param collectionSchema collection info
+         * @param collectionSchema collection schema
          * @return <code>Builder</code>
          */
         public Builder withCollectionSchema(@NonNull CollectionSchemaParam collectionSchema) {
@@ -72,6 +74,17 @@ public class LocalBulkWriterParam {
             return this;
         }
 
+        /**
+         * Sets the collection schema by V2.
+         *
+         * @param collectionSchema collection schema
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionSchema(@NonNull CreateCollectionReq.CollectionSchema collectionSchema) {
+            this.collectionSchema = V2AdapterUtils.convertV2Schema(collectionSchema);
+            return this;
+        }
+
         /**
          * Sets the localPath.
          *

+ 13 - 0
src/main/java/io/milvus/bulkwriter/RemoteBulkWriterParam.java

@@ -19,11 +19,13 @@
 
 package io.milvus.bulkwriter;
 
+import io.milvus.bulkwriter.common.utils.V2AdapterUtils;
 import io.milvus.bulkwriter.connect.StorageConnectParam;
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
 import io.milvus.exception.ParamException;
 import io.milvus.param.ParamUtils;
 import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.ToString;
@@ -77,6 +79,17 @@ public class RemoteBulkWriterParam {
             return this;
         }
 
+        /**
+         * Sets the collection schema by V2.
+         *
+         * @param collectionSchema collection schema
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionSchema(@NonNull CreateCollectionReq.CollectionSchema collectionSchema) {
+            this.collectionSchema = V2AdapterUtils.convertV2Schema(collectionSchema);
+            return this;
+        }
+
         public Builder withConnectParam(@NotNull StorageConnectParam connectParam) {
             this.connectParam = connectParam;
             return this;

+ 66 - 0
src/main/java/io/milvus/bulkwriter/common/utils/V2AdapterUtils.java

@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter.common.utils;
+
+import io.milvus.grpc.DataType;
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+
+import java.util.List;
+
+public class V2AdapterUtils {
+    public static CollectionSchemaParam convertV2Schema(CreateCollectionReq.CollectionSchema schemaV2) {
+        CollectionSchemaParam.Builder schemaBuilder = CollectionSchemaParam.newBuilder()
+                .withEnableDynamicField(schemaV2.isEnableDynamicField());
+
+        List<CreateCollectionReq.FieldSchema> fieldSchemaList = schemaV2.getFieldSchemaList();
+        for (CreateCollectionReq.FieldSchema fieldSchema : fieldSchemaList) {
+            FieldType.Builder fieldBuilder = FieldType.newBuilder()
+                    .withName(fieldSchema.getName())
+                    .withDescription(fieldSchema.getDescription())
+                    .withDataType(DataType.valueOf(fieldSchema.getDataType().name()))
+                    .withPrimaryKey(fieldSchema.getIsPrimaryKey())
+                    .withPartitionKey(fieldSchema.getIsPartitionKey())
+                    .withClusteringKey(fieldSchema.getIsClusteringKey())
+                    .withAutoID(fieldSchema.getAutoID());
+            // set vector dimension
+            if(fieldSchema.getDimension() != null){
+                fieldBuilder.withDimension(fieldSchema.getDimension());
+            }
+            // set varchar max length
+            if(fieldSchema.getDataType() == io.milvus.v2.common.DataType.VarChar && fieldSchema.getMaxLength() != null){
+                fieldBuilder.withMaxLength(fieldSchema.getMaxLength());
+            }
+            // set array parameters
+            if (fieldSchema.getDataType() == io.milvus.v2.common.DataType.Array) {
+                fieldBuilder.withMaxCapacity(fieldSchema.getMaxCapacity());
+                fieldBuilder.withElementType(DataType.valueOf(fieldSchema.getElementType().name()));
+                if (fieldSchema.getElementType() == io.milvus.v2.common.DataType.VarChar && fieldSchema.getMaxLength() != null) {
+                    fieldBuilder.withMaxLength(fieldSchema.getMaxLength());
+                }
+            }
+
+            schemaBuilder.addFieldType(fieldBuilder.build());
+        }
+
+        return schemaBuilder.build();
+    }
+}

+ 146 - 0
src/test/java/io/milvus/bulkwriter/BulkWriterTest.java

@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter;
+
+import io.milvus.bulkwriter.common.utils.V2AdapterUtils;
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.service.collection.request.AddFieldReq;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BulkWriterTest {
+    @Test
+    void testV2AdapterUtils() {
+        CreateCollectionReq.CollectionSchema schemaV2 = CreateCollectionReq.CollectionSchema.builder()
+                .build();
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("id")
+                .dataType(DataType.Int64)
+                .isPrimaryKey(Boolean.TRUE)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("bool_field")
+                .dataType(DataType.Bool)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("int8_field")
+                .dataType(DataType.Int8)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("int16_field")
+                .dataType(DataType.Int16)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("int32_field")
+                .dataType(DataType.Int32)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("int64_field")
+                .dataType(DataType.Int64)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("float_field")
+                .dataType(DataType.Float)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("double_field")
+                .dataType(DataType.Double)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("varchar_field")
+                .dataType(DataType.VarChar)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("json_field")
+                .dataType(DataType.JSON)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("arr_int_field")
+                .dataType(DataType.Array)
+                .maxCapacity(50)
+                .elementType(DataType.Int32)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("arr_float_field")
+                .dataType(DataType.Array)
+                .maxCapacity(20)
+                .elementType(DataType.Float)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("arr_varchar_field")
+                .dataType(DataType.Array)
+                .maxCapacity(10)
+                .elementType(DataType.VarChar)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("float_vector_field")
+                .dataType(DataType.FloatVector)
+                .dimension(128)
+                .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("binary_vector_field")
+                .dataType(DataType.BinaryVector)
+                .dimension(512)
+                .build());
+
+        CollectionSchemaParam schemaV1 = V2AdapterUtils.convertV2Schema(schemaV2);
+        Assertions.assertEquals(schemaV2.isEnableDynamicField(), schemaV1.isEnableDynamicField());
+
+        List<CreateCollectionReq.FieldSchema> fieldSchemaListV2 = schemaV2.getFieldSchemaList();
+        Map<String, CreateCollectionReq.FieldSchema> fieldSchemaMapV2 = new HashMap<>();
+        for (CreateCollectionReq.FieldSchema field : fieldSchemaListV2) {
+            fieldSchemaMapV2.put(field.getName(), field);
+        }
+
+        List<FieldType> fieldSchemaListV1 = schemaV1.getFieldTypes();
+        for (FieldType fieldSchemaV1 : fieldSchemaListV1) {
+            Assertions.assertTrue(fieldSchemaMapV2.containsKey(fieldSchemaV1.getName()));
+            CreateCollectionReq.FieldSchema fieldSchemaV2 = fieldSchemaMapV2.get(fieldSchemaV1.getName());
+            Assertions.assertEquals(fieldSchemaV2.getDescription(), fieldSchemaV1.getDescription());
+            Assertions.assertEquals(fieldSchemaV2.getDataType().name(), fieldSchemaV1.getDataType().name());
+            Assertions.assertEquals(fieldSchemaV2.getIsPrimaryKey(), fieldSchemaV1.isPrimaryKey());
+            Assertions.assertEquals(fieldSchemaV2.getIsPartitionKey(), fieldSchemaV1.isPartitionKey());
+            Assertions.assertEquals(fieldSchemaV2.getIsClusteringKey(), fieldSchemaV1.isClusteringKey());
+            Assertions.assertEquals(fieldSchemaV2.getAutoID(), fieldSchemaV1.isAutoID());
+
+            if (fieldSchemaV2.getDimension() != null) {
+                Assertions.assertEquals(fieldSchemaV2.getDimension(), fieldSchemaV1.getDimension());
+            }
+            if(fieldSchemaV2.getDataType() == DataType.VarChar) {
+                Assertions.assertEquals(fieldSchemaV2.getMaxLength(), fieldSchemaV1.getMaxLength());
+            }
+
+            if (fieldSchemaV2.getDataType() == DataType.Array) {
+                Assertions.assertEquals(fieldSchemaV2.getMaxCapacity(), fieldSchemaV1.getMaxCapacity());
+                Assertions.assertEquals(fieldSchemaV2.getElementType().name(), fieldSchemaV1.getElementType().name());
+                if (fieldSchemaV2.getElementType() == DataType.VarChar) {
+                    Assertions.assertEquals(fieldSchemaV2.getMaxLength(), fieldSchemaV1.getMaxLength());
+                }
+            }
+        }
+    }
+}

+ 0 - 4
src/test/java/io/milvus/client/MilvusServiceClientTest.java

@@ -34,7 +34,6 @@ import io.milvus.param.collection.*;
 import io.milvus.param.control.*;
 import io.milvus.param.credential.*;
 import io.milvus.param.dml.*;
-import io.milvus.param.dml.ranker.BaseRanker;
 import io.milvus.param.dml.ranker.RRFRanker;
 import io.milvus.param.index.*;
 import io.milvus.param.partition.*;
@@ -48,9 +47,6 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.*;