Browse Source

support bulkWriter (#804)

Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
xushuang.hu 1 year ago
parent
commit
eb6350d846
33 changed files with 3435 additions and 15 deletions
  1. 892 0
      examples/main/java/io/milvus/BulkWriterExample.java
  2. 1 0
      examples/main/resources/data/train_embeddings.csv
  3. 103 12
      pom.xml
  4. 292 0
      src/main/java/io/milvus/bulkwriter/Buffer.java
  5. 207 0
      src/main/java/io/milvus/bulkwriter/BulkWriter.java
  6. 136 0
      src/main/java/io/milvus/bulkwriter/CloudImport.java
  7. 189 0
      src/main/java/io/milvus/bulkwriter/LocalBulkWriter.java
  8. 112 0
      src/main/java/io/milvus/bulkwriter/LocalBulkWriterParam.java
  9. 267 0
      src/main/java/io/milvus/bulkwriter/RemoteBulkWriter.java
  10. 126 0
      src/main/java/io/milvus/bulkwriter/RemoteBulkWriterParam.java
  11. 12 0
      src/main/java/io/milvus/bulkwriter/common/clientenum/BulkFileType.java
  12. 49 0
      src/main/java/io/milvus/bulkwriter/common/clientenum/CloudStorage.java
  13. 42 0
      src/main/java/io/milvus/bulkwriter/common/clientenum/TypeSize.java
  14. 132 0
      src/main/java/io/milvus/bulkwriter/common/utils/GeneratorUtils.java
  15. 36 0
      src/main/java/io/milvus/bulkwriter/common/utils/ImportUtils.java
  16. 25 0
      src/main/java/io/milvus/bulkwriter/common/utils/ParquetReaderUtils.java
  17. 119 0
      src/main/java/io/milvus/bulkwriter/common/utils/ParquetUtils.java
  18. 98 0
      src/main/java/io/milvus/bulkwriter/connect/AzureConnectParam.java
  19. 112 0
      src/main/java/io/milvus/bulkwriter/connect/S3ConnectParam.java
  20. 5 0
      src/main/java/io/milvus/bulkwriter/connect/StorageConnectParam.java
  21. 18 0
      src/main/java/io/milvus/bulkwriter/response/BulkImportResponse.java
  22. 45 0
      src/main/java/io/milvus/bulkwriter/response/GetImportProgressResponse.java
  23. 35 0
      src/main/java/io/milvus/bulkwriter/response/ListImportJobsResponse.java
  24. 22 0
      src/main/java/io/milvus/bulkwriter/response/RestfulResponse.java
  25. 10 0
      src/main/java/io/milvus/bulkwriter/storage/StorageClient.java
  26. 60 0
      src/main/java/io/milvus/bulkwriter/storage/client/AzureStorageClient.java
  27. 74 0
      src/main/java/io/milvus/bulkwriter/storage/client/MinioStorageClient.java
  28. 21 0
      src/main/java/io/milvus/common/utils/ExceptionUtils.java
  29. 31 0
      src/main/java/io/milvus/exception/UnExpectedException.java
  30. 1 1
      src/main/java/io/milvus/param/ParamUtils.java
  31. 120 0
      src/main/java/io/milvus/param/collection/CollectionSchemaParam.java
  32. 30 2
      src/main/java/io/milvus/param/collection/CreateCollectionParam.java
  33. 13 0
      src/main/java/io/milvus/response/DescCollResponseWrapper.java

+ 892 - 0
examples/main/java/io/milvus/BulkWriterExample.java

@@ -0,0 +1,892 @@
+package io.milvus;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import io.milvus.bulkwriter.BulkWriter;
+import io.milvus.bulkwriter.CloudImport;
+import io.milvus.bulkwriter.LocalBulkWriter;
+import io.milvus.bulkwriter.LocalBulkWriterParam;
+import io.milvus.bulkwriter.RemoteBulkWriter;
+import io.milvus.bulkwriter.RemoteBulkWriterParam;
+import io.milvus.bulkwriter.connect.AzureConnectParam;
+import io.milvus.bulkwriter.connect.S3ConnectParam;
+import io.milvus.bulkwriter.connect.StorageConnectParam;
+import io.milvus.bulkwriter.response.BulkImportResponse;
+import io.milvus.bulkwriter.response.GetImportProgressResponse;
+import io.milvus.bulkwriter.response.ListImportJobsResponse;
+import io.milvus.client.MilvusClient;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.bulkwriter.common.clientenum.CloudStorage;
+import io.milvus.common.utils.ExceptionUtils;
+import io.milvus.bulkwriter.common.utils.GeneratorUtils;
+import io.milvus.bulkwriter.common.utils.ImportUtils;
+import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.GetCollectionStatisticsResponse;
+import io.milvus.grpc.GetImportStateResponse;
+import io.milvus.grpc.ImportResponse;
+import io.milvus.grpc.ImportState;
+import io.milvus.grpc.KeyValuePair;
+import io.milvus.grpc.QueryResults;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.bulkinsert.BulkInsertParam;
+import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.DropCollectionParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.FlushParam;
+import io.milvus.param.collection.GetCollectionStatisticsParam;
+import io.milvus.param.collection.HasCollectionParam;
+import io.milvus.param.collection.LoadCollectionParam;
+import io.milvus.param.dml.QueryParam;
+import io.milvus.param.index.CreateIndexParam;
+import io.milvus.response.GetCollStatResponseWrapper;
+import io.milvus.response.QueryResultsWrapper;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.util.Asserts;
+import org.apache.logging.log4j.util.Strings;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static io.milvus.BulkWriterExample.MilvusConsts.HOST;
+import static io.milvus.BulkWriterExample.MilvusConsts.PORT;
+import static io.milvus.BulkWriterExample.MilvusConsts.USER_NAME;
+
+
+public class BulkWriterExample {
+
+    // milvus
+    public static class MilvusConsts {
+        public static final String HOST = "127.0.0.1";
+        public static final Integer PORT = 19530;
+        public static final String USER_NAME = "user.name";
+        public static final String PASSWORD = "password";
+    }
+
+    /**
+     * If you need to transfer the files generated by bulkWriter to the corresponding remote storage (AWS S3, GCP GCS, Azure Blob, Aliyun OSS, Tencent Cloud TOS),
+     * you need to configure it accordingly; Otherwise, you can ignore it.
+     */
+    public static class StorageConsts {
+        public static final CloudStorage cloudStorage = CloudStorage.AWS;
+        public static final String STORAGE_ENDPOINT = cloudStorage.getEndpoint();
+
+        /**
+         * If using remote storage such as AWS S3, GCP GCS, Aliyun OSS, Tencent Cloud TOS,
+         * please configure the following parameters.
+         */
+        public static final String STORAGE_BUCKET = "storage.bucket";
+        public static final String STORAGE_ACCESS_KEY = "storage.access.key";
+        public static final String STORAGE_SECRET_KEY = "storage.secret.key";
+        public static final String STORAGE_REGION = "storage.region";
+
+        /**
+         * If using remote storage such as Azure Blob
+         * please configure the following parameters.
+         */
+        public static final String AZURE_CONTAINER_NAME = "azure.container.name";
+        public static final String AZURE_ACCOUNT_NAME = "azure.account.name";
+        public static final String AZURE_ACCOUNT_KEY = "azure.account.key";
+    }
+
+
+    /**
+     * If you have used remoteBulkWriter to generate remote data and want to import data using the Import interface on Zilliz Cloud after generation,
+     * you don't need to configure the following object-related parameters (OBJECT_URL, OBJECT_ACCESS_KEY, OBJECT_SECRET_KEY). You can call the callCloudImport method, as the internal logic has been encapsulated for you.
+     * <p>
+     * If you already have data stored in remote storage (not generated through remoteBulkWriter), and you want to invoke the Import interface on Zilliz Cloud to import data,
+     * you need to configure the following parameters and then follow the exampleCloudBulkInsert method.
+     * <p>
+     * If you do not need to import data through the Import interface on Zilliz Cloud, you can ignore this.
+     */
+    public static class CloudImportConsts {
+
+        /**
+         * If you are an overseas user, you can use the following endpoint format: https://controller.api.{cloud-region}.zillizcloud.com/v1/vector/collections/import.
+         * If not, you can use the following endpoint format: https://controller.api.${CLOUD_REGION_ID}.cloud.zilliz.com.cn/v1/vector/collections/import.
+         */
+        public static final String CLOUD_ENDPOINT = "https://controller.api.${CLOUD-REGION}.{ENDPOINT-SUFFIX}";
+        public static final String API_KEY = "_api_key_of_the_user";
+        public static final String CLUSTER_ID = "_your_cloud_instance_id_";
+        public static final String COLLECTION_NAME = "_collection_name_on_the_cloud_";
+
+        /**
+         * Please provide the complete URL for the file or folder you want to import, similar to https://bucket-name.s3.region-code.amazonaws.com/object-name.
+         * For more details, you can refer to https://docs.zilliz.com/docs/import-data-on-web-ui.
+         */
+        public static final String OBJECT_URL = "_your_storage_object_url_";
+        public static final String OBJECT_ACCESS_KEY = "_your_storage_access_key_";
+        public static final String OBJECT_SECRET_KEY = "_your_storage_secret_key_";
+    }
+
+    private static final String SIMPLE_COLLECTION_NAME = "for_bulkwriter";
+    private static final String ALL_TYPES_COLLECTION_NAME = "all_types_for_bulkwriter";
+    private static final Integer DIM = 512;
+    private MilvusClient milvusClient;
+
+    public static void main(String[] args) throws Exception {
+        BulkWriterExample exampleBulkWriter = new BulkWriterExample();
+        exampleBulkWriter.createConnection();
+
+        List<BulkFileType> fileTypes = Lists.newArrayList(
+                BulkFileType.PARQUET
+        );
+
+        exampleSimpleCollection(exampleBulkWriter, fileTypes);
+        exampleAllTypeCollectionRemote(exampleBulkWriter, fileTypes);
+
+        // to call cloud import api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
+        // exampleCloudImport();
+    }
+
+    private void createConnection() {
+        System.out.println("\nCreate connection...");
+        ConnectParam connectParam = ConnectParam.newBuilder()
+                .withHost(HOST)
+                .withPort(PORT)
+                .withAuthorization(USER_NAME, MilvusConsts.PASSWORD)
+                .withSecure(true)
+                .build();
+        milvusClient = new MilvusServiceClient(connectParam);
+        System.out.println("\nConnected");
+    }
+
+    private static void exampleSimpleCollection(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
+        CollectionSchemaParam collectionSchema = exampleBulkWriter.buildSimpleCollection();
+        exampleBulkWriter.createCollection(SIMPLE_COLLECTION_NAME, collectionSchema, false);
+
+        for (BulkFileType fileType : fileTypes) {
+            localWriter(collectionSchema, fileType);
+        }
+
+        for (BulkFileType fileType : fileTypes) {
+            remoteWriter(collectionSchema, fileType);
+        }
+
+        // parallel append
+        parallelAppend(collectionSchema);
+    }
+
+    private static void exampleAllTypeCollectionRemote(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
+        // float vectors + all scalar types, use bulkInsert interface
+        for (BulkFileType fileType : fileTypes) {
+            CollectionSchemaParam collectionSchema = buildAllTypeSchema(false, true);
+            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(false, collectionSchema, fileType);
+            exampleBulkWriter.callBulkInsert(collectionSchema, batchFiles);
+            exampleBulkWriter.retrieveImportData(false);
+        }
+
+        // binary vectors + all scalar types, use bulkInsert interface
+        for (BulkFileType fileType : fileTypes) {
+            CollectionSchemaParam collectionSchema = buildAllTypeSchema(true, true);
+            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(true, collectionSchema, fileType);
+            exampleBulkWriter.callBulkInsert(collectionSchema, batchFiles);
+            exampleBulkWriter.retrieveImportData(true);
+        }
+
+        // float vectors + all scalar types, use cloud import api.
+        // You need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
+        for (BulkFileType fileType : fileTypes) {
+            CollectionSchemaParam collectionSchema = buildAllTypeSchema(false, true);
+            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(false, collectionSchema, fileType);
+            exampleBulkWriter.createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);
+            exampleBulkWriter.callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME);
+            exampleBulkWriter.retrieveImportData(false);
+        }
+    }
+
+    private static void localWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
+        System.out.printf("\n===================== local writer (%s) ====================%n", fileType.name());
+        LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
+                .withCollectionSchema(collectionSchema)
+                .withLocalPath("/tmp/bulk_writer")
+                .withFileType(fileType)
+                .withChunkSize(128 * 1024 * 1024)
+                .build();
+
+        try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
+            // read data from csv
+            readCsvSampleData("data/train_embeddings.csv", localBulkWriter);
+
+            // append rows
+            for (int i = 0; i < 100000; i++) {
+                JSONObject row = new JSONObject();
+                row.put("path", "path_" + i);
+                row.put("vector", GeneratorUtils.genFloatVector(DIM));
+                row.put("label", "label_" + i);
+
+                localBulkWriter.appendRow(row);
+            }
+
+            System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
+            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
+
+            localBulkWriter.commit(false);
+            List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
+            System.out.printf("Local writer done! output local files: %s%n", batchFiles);
+        } catch (Exception e) {
+            System.out.println("localWriter catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static void remoteWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
+        System.out.printf("\n===================== remote writer (%s) ====================%n", fileType.name());
+
+        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
+            // read data from csv
+            readCsvSampleData("data/train_embeddings.csv", remoteBulkWriter);
+
+            // append rows
+            for (int i = 0; i < 100000; i++) {
+                JSONObject row = new JSONObject();
+                row.put("path", "path_" + i);
+                row.put("vector", GeneratorUtils.genFloatVector(DIM));
+                row.put("label", "label_" + i);
+
+                remoteBulkWriter.appendRow(row);
+            }
+
+            System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
+            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
+
+            remoteBulkWriter.commit(false);
+            List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
+
+            System.out.printf("Remote writer done! output remote files: %s%n", batchFiles);
+        } catch (Exception e) {
+            System.out.println("remoteWriter catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static void parallelAppend(CollectionSchemaParam collectionSchema) throws Exception {
+        System.out.print("\n===================== parallel append ====================");
+        LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
+                .withCollectionSchema(collectionSchema)
+                .withLocalPath("/tmp/bulk_writer")
+                .withFileType(BulkFileType.PARQUET)
+                .withChunkSize(128 * 1024 * 1024)  // 128MB
+                .build();
+
+        try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
+            List<Thread> threads = new ArrayList<>();
+            int threadCount = 10;
+            int rowsPerThread = 1000;
+            for (int i = 0; i < threadCount; ++i) {
+                int current = i;
+                Thread thread = new Thread(() -> appendRow(localBulkWriter, current * rowsPerThread, (current + 1) * rowsPerThread));
+                threads.add(thread);
+                thread.start();
+                System.out.printf("Thread %s started%n", thread.getName());
+            }
+
+            for (Thread thread : threads) {
+                thread.join();
+                System.out.printf("Thread %s finished%n", thread.getName());
+            }
+
+            System.out.println(localBulkWriter.getTotalRowCount() + " rows appends");
+            System.out.println(localBulkWriter.getBufferRowCount() + " rows in buffer not flushed");
+            localBulkWriter.commit(false);
+            System.out.printf("Append finished, %s rows%n", threadCount * rowsPerThread);
+
+            int rowCount = 0;
+            List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
+            for (List<String> batch : batchFiles) {
+                for (String filePath : batch) {
+                    rowCount += readParquet(filePath);
+                }
+            }
+
+            Asserts.check(rowCount == threadCount * rowsPerThread, String.format("rowCount %s not equals expected %s", rowCount, threadCount * rowsPerThread));
+            System.out.println("Data is correct");
+        } catch (Exception e) {
+            System.out.println("parallelAppend catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static long readParquet(String localFilePath) throws Exception {
+        final long[] rowCount = {0};
+        new ParquetReaderUtils() {
+            @Override
+            public void readRecord(GenericData.Record record) {
+                rowCount[0]++;
+                String pathValue = record.get("path").toString();
+                String labelValue = record.get("label").toString();
+                Asserts.check(pathValue.replace("path_", "").equals(labelValue.replace("label_", "")), String.format("the suffix of %s not equals the suffix of %s", pathValue, labelValue));
+            }
+        }.readParquet(localFilePath);
+        System.out.printf("The file %s contains %s rows. Verify the content...%n", localFilePath, rowCount[0]);
+        return rowCount[0];
+    }
+
+    private static void appendRow(LocalBulkWriter writer, int begin, int end) {
+        try {
+            for (int i = begin; i < end; ++i) {
+                JSONObject row = new JSONObject();
+                row.put("path", "path_" + i);
+                row.put("vector", GeneratorUtils.genFloatVector(DIM));
+                row.put("label", "label_" + i);
+
+                writer.appendRow(row);
+                if (i % 100 == 0) {
+                    System.out.printf("%s inserted %s items%n", Thread.currentThread().getName(), i - begin);
+                }
+            }
+        } catch (Exception e) {
+            System.out.println("failed to append row!");
+        }
+    }
+
+    private List<List<String>> allTypesRemoteWriter(boolean binVec, CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
+        System.out.printf("\n===================== all field types (%s) binary_vector=%s ====================%n", fileType.name(), binVec);
+
+        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
+            System.out.println("Append rows");
+            int batchCount = 10000;
+
+            for (int i = 0; i < batchCount; ++i) {
+                JSONObject rowObject = new JSONObject();
+
+                // scalar field
+                rowObject.put("id", i);
+                rowObject.put("bool", i % 5 == 0);
+                rowObject.put("int8", i % 128);
+                rowObject.put("int16", i % 1000);
+                rowObject.put("int32", i % 100000);
+                rowObject.put("float", i / 3);
+                rowObject.put("double", i / 7);
+                rowObject.put("varchar", "varchar_" + i);
+                rowObject.put("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
+
+                // vector field
+                rowObject.put("vector", binVec ? GeneratorUtils.generatorBinaryVector(128) : GeneratorUtils.generatorFloatValue(128));
+
+                // array field
+                rowObject.put("arrayInt64", GeneratorUtils.generatorLongValue(10));
+                rowObject.put("arrayVarchar", GeneratorUtils.generatorVarcharValue(10, 10));
+                rowObject.put("arrayInt8", GeneratorUtils.generatorInt8Value(10));
+                rowObject.put("arrayInt16", GeneratorUtils.generatorInt16Value(10));
+                rowObject.put("arrayInt32", GeneratorUtils.generatorInt32Value(10));
+                rowObject.put("arrayFloat", GeneratorUtils.generatorFloatValue(10));
+                rowObject.put("arrayDouble", GeneratorUtils.generatorDoubleValue(10));
+                rowObject.put("arrayBool", GeneratorUtils.generatorBoolValue(10));
+
+                remoteBulkWriter.appendRow(rowObject);
+            }
+            System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
+            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
+            System.out.println("Generate data files...");
+            remoteBulkWriter.commit(false);
+
+            System.out.printf("Data files have been uploaded: %s%n", remoteBulkWriter.getBatchFiles());
+            return remoteBulkWriter.getBatchFiles();
+        } catch (Exception e) {
+            System.out.println("allTypesRemoteWriter catch exception: " + e);
+            throw e;
+        }
+    }
+
+    private static RemoteBulkWriter buildRemoteBulkWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws IOException {
+        StorageConnectParam connectParam = buildStorageConnectParam();
+        RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
+                .withCollectionSchema(collectionSchema)
+                .withRemotePath("bulk_data")
+                .withFileType(fileType)
+                .withChunkSize(512 * 1024 * 1024)
+                .withConnectParam(connectParam)
+                .build();
+        return new RemoteBulkWriter(bulkWriterParam);
+    }
+
+    private static StorageConnectParam buildStorageConnectParam() {
+        StorageConnectParam connectParam;
+        if (StorageConsts.cloudStorage == CloudStorage.AZURE) {
+            String connectionStr = "DefaultEndpointsProtocol=https;AccountName=" + StorageConsts.AZURE_ACCOUNT_NAME +
+                    ";AccountKey=" + StorageConsts.AZURE_ACCOUNT_KEY + ";EndpointSuffix=core.windows.net";
+            connectParam = AzureConnectParam.newBuilder()
+                    .withConnStr(connectionStr)
+                    .withContainerName(StorageConsts.AZURE_CONTAINER_NAME)
+                    .build();
+        } else {
+            connectParam = S3ConnectParam.newBuilder()
+                    .withEndpoint(StorageConsts.STORAGE_ENDPOINT)
+                    .withBucketName(StorageConsts.STORAGE_BUCKET)
+                    .withAccessKey(StorageConsts.STORAGE_ACCESS_KEY)
+                    .withSecretKey(StorageConsts.STORAGE_SECRET_KEY)
+                    .withRegion(StorageConsts.STORAGE_REGION)
+                    .build();
+        }
+        return connectParam;
+    }
+
+    private static void readCsvSampleData(String filePath, BulkWriter writer) throws IOException, InterruptedException {
+        ClassLoader classLoader = BulkWriterExample.class.getClassLoader();
+        URL resourceUrl = classLoader.getResource(filePath);
+        filePath = new File(resourceUrl.getFile()).getAbsolutePath();
+
+        CsvMapper csvMapper = new CsvMapper();
+
+        File csvFile = new File(filePath);
+        CsvSchema csvSchema = CsvSchema.builder().setUseHeader(true).build();
+        Iterator<CsvDataObject> iterator = csvMapper.readerFor(CsvDataObject.class).with(csvSchema).readValues(csvFile);
+        while (iterator.hasNext()) {
+            CsvDataObject dataObject = iterator.next();
+            JSONObject row = new JSONObject();
+
+            row.put("vector", dataObject.toFloatArray());
+            row.put("label", dataObject.getLabel());
+            row.put("path", dataObject.getPath());
+
+            writer.appendRow(row);
+        }
+    }
+
+    private static class CsvDataObject {
+        @JsonProperty
+        private String vector;
+        @JsonProperty
+        private String path;
+        @JsonProperty
+        private String label;
+
+        public String getVector() {
+            return vector;
+        }
+        public String getPath() {
+            return path;
+        }
+        public String getLabel() {
+            return label;
+        }
+        public List<Float> toFloatArray() {
+            return new Gson().fromJson(vector, new TypeToken<List<Float>>() {
+            }.getType());
+        }
+    }
+
+    private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
+        System.out.println("\n===================== call bulkInsert ====================");
+        createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);
+
+        List<Long> taskIds = new ArrayList<>();
+        for (List<String> batch : batchFiles) {
+            Long taskId = bulkInsert(batch);
+            taskIds.add(taskId);
+            System.out.println("Create a bulkInert task, task id: " + taskId);
+        }
+
+        while (taskIds.size() > 0) {
+            Iterator<Long> iterator = taskIds.iterator();
+            List<Long> tempTaskIds = new ArrayList<>();
+            while (iterator.hasNext()) {
+                Long taskId = iterator.next();
+                System.out.println("Wait 5 second to check bulkInsert tasks state...");
+                TimeUnit.SECONDS.sleep(5);
+
+                GetImportStateResponse bulkInsertState = getBulkInsertState(taskId);
+                if (bulkInsertState.getState() == ImportState.ImportFailed
+                        || bulkInsertState.getState() == ImportState.ImportFailedAndCleaned) {
+                    List<KeyValuePair> infosList = bulkInsertState.getInfosList();
+                    Optional<String> failedReasonOptional = infosList.stream().filter(e -> e.getKey().equals("failed_reason"))
+                            .map(KeyValuePair::getValue).findFirst();
+                    String failedReson = failedReasonOptional.orElse(Strings.EMPTY);
+
+                    System.out.printf("The task %s failed, reason: %s%n", taskId, failedReson);
+                } else if (bulkInsertState.getState() == ImportState.ImportCompleted) {
+                    System.out.printf("The task %s completed%n", taskId);
+                } else {
+                    System.out.printf("The task %s is running, state:%s%n", taskId, bulkInsertState.getState());
+                    tempTaskIds.add(taskId);
+                }
+            }
+            taskIds = tempTaskIds;
+        }
+
+        System.out.println("Collection row number: " + getCollectionStatistics());
+    }
+
+    private void callCloudImport(List<List<String>> batchFiles, String collectionName) throws InterruptedException, MalformedURLException {
+        System.out.println("\n===================== call cloudImport ====================");
+
+        String objectUrl = StorageConsts.cloudStorage.getObjectUrl(StorageConsts.STORAGE_BUCKET, ImportUtils.getCommonPrefix(batchFiles), StorageConsts.STORAGE_REGION);
+        String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
+        String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;
+
+        BulkImportResponse bulkImportResponse = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, objectUrl, accessKey, secretKey, CloudImportConsts.CLUSTER_ID, collectionName);
+        String jobId = bulkImportResponse.getJobId();
+        System.out.println("Create a cloudImport job, job id: " + jobId);
+
+        while (true) {
+            System.out.println("Wait 5 second to check bulkInsert job state...");
+            TimeUnit.SECONDS.sleep(5);
+
+            GetImportProgressResponse getImportProgressResponse = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, jobId, CloudImportConsts.CLUSTER_ID);
+            if (getImportProgressResponse.getReadyPercentage().intValue() == 1) {
+                System.out.printf("The job %s completed%n", jobId);
+                break;
+            } else if (StringUtils.isNotEmpty(getImportProgressResponse.getErrorMessage())) {
+                System.out.printf("The job %s failed, reason: %s%n", jobId, getImportProgressResponse.getErrorMessage());
+                break;
+            } else {
+                System.out.printf("The job %s is running, progress:%s%n", jobId, getImportProgressResponse.getReadyPercentage());
+            }
+        }
+
+        System.out.println("Collection row number: " + getCollectionStatistics());
+    }
+
+    /**
+     * @param collectionSchema collection info
+     * @param dropIfExist     if collection already exist, will drop firstly and then create again
+     */
+    private void createCollection(String collectionName, CollectionSchemaParam collectionSchema, boolean dropIfExist) {
+        System.out.println("\n===================== create collection ====================");
+        checkMilvusClientIfExist();
+        CreateCollectionParam collectionParam = CreateCollectionParam.newBuilder()
+                .withCollectionName(collectionName)
+                .withSchema(collectionSchema)
+                .build();
+        R<Boolean> hasCollection = milvusClient.hasCollection(HasCollectionParam.newBuilder().withCollectionName(collectionName).build());
+        if (hasCollection.getData()) {
+            if (dropIfExist) {
+                milvusClient.dropCollection(DropCollectionParam.newBuilder().withCollectionName(collectionName).build());
+                milvusClient.createCollection(collectionParam);
+            }
+        } else {
+            milvusClient.createCollection(collectionParam);
+        }
+        System.out.printf("Collection %s created%n", collectionName);
+    }
+
+    private void retrieveImportData(boolean binVec) {
+        createIndex(binVec);
+
+        List<Integer> ids = Lists.newArrayList(100, 5000);
+        System.out.printf("Load collection and query items %s%n", ids);
+        loadCollection();
+
+        String expr = String.format("id in %s", ids);
+        System.out.println(expr);
+
+        List<QueryResultsWrapper.RowRecord> rowRecords = query(expr, Lists.newArrayList("*", "vector"));
+        System.out.println("Query results:");
+        for (QueryResultsWrapper.RowRecord record : rowRecords) {
+            System.out.println(record);
+        }
+    }
+
+    private void createIndex(boolean binVec) {
+        System.out.println("Create index...");
+        checkMilvusClientIfExist();
+        CreateIndexParam.Builder builder = CreateIndexParam.newBuilder()
+                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
+                .withFieldName("vector")
+                .withIndexName("index_name")
+                .withSyncMode(Boolean.TRUE);
+
+        if (binVec) {
+            builder.withIndexType(IndexType.BIN_FLAT);
+            builder.withMetricType(MetricType.HAMMING);
+        } else {
+            builder.withIndexType(IndexType.FLAT);
+            builder.withMetricType(MetricType.L2);
+        }
+
+        R<RpcStatus> response = milvusClient.createIndex(builder.build());
+        ExceptionUtils.handleResponseStatus(response);
+    }
+
+    private R<RpcStatus> loadCollection() {
+        System.out.println("Loading Collection...");
+        checkMilvusClientIfExist();
+        R<RpcStatus> response = milvusClient.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
+                .build());
+        ExceptionUtils.handleResponseStatus(response);
+        return response;
+    }
+
+    private List<QueryResultsWrapper.RowRecord> query(String expr, List<String> outputFields) {
+        System.out.println("========== query() ==========");
+        checkMilvusClientIfExist();
+        QueryParam test = QueryParam.newBuilder()
+                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
+                .withExpr(expr)
+                .withOutFields(outputFields)
+                .build();
+        R<QueryResults> response = milvusClient.query(test);
+        ExceptionUtils.handleResponseStatus(response);
+        QueryResultsWrapper wrapper = new QueryResultsWrapper(response.getData());
+        return wrapper.getRowRecords();
+    }
+
+    private Long bulkInsert(List<String> batchFiles) {
+        System.out.println("========== bulkInsert() ==========");
+        checkMilvusClientIfExist();
+        R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
+                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
+                .withFiles(batchFiles)
+                .build());
+        ExceptionUtils.handleResponseStatus(response);
+        return response.getData().getTasksList().get(0);
+    }
+
+    private GetImportStateResponse getBulkInsertState(Long taskId) {
+        System.out.println("========== getBulkInsertState() ==========");
+        checkMilvusClientIfExist();
+        R<GetImportStateResponse> bulkInsertState = milvusClient.getBulkInsertState(GetBulkInsertStateParam.newBuilder()
+                .withTask(taskId)
+                .build());
+        return bulkInsertState.getData();
+    }
+
+    private Long getCollectionStatistics() {
+        System.out.println("========== getCollectionStatistics() ==========");
+        // call flush() to flush the insert buffer to storage,
+        // so that the getCollectionStatistics() can get correct number
+        checkMilvusClientIfExist();
+        milvusClient.flush(FlushParam.newBuilder().addCollectionName(ALL_TYPES_COLLECTION_NAME).build());
+        R<GetCollectionStatisticsResponse> response = milvusClient.getCollectionStatistics(
+                GetCollectionStatisticsParam.newBuilder()
+                        .withCollectionName(ALL_TYPES_COLLECTION_NAME)
+                        .build());
+        ExceptionUtils.handleResponseStatus(response);
+        GetCollStatResponseWrapper wrapper = new GetCollStatResponseWrapper(response.getData());
+        return wrapper.getRowCount();
+    }
+
+    private static void exampleCloudImport() throws MalformedURLException {
+        System.out.println("\n===================== import files to cloud vectordb ====================");
+        BulkImportResponse bulkImportResponse = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY,
+                CloudImportConsts.OBJECT_URL, CloudImportConsts.OBJECT_ACCESS_KEY, CloudImportConsts.OBJECT_SECRET_KEY,
+                CloudImportConsts.CLUSTER_ID, CloudImportConsts.COLLECTION_NAME);
+        System.out.println(new Gson().toJson(bulkImportResponse));
+
+        System.out.println("\n===================== get import job progress ====================");
+        String jobId = bulkImportResponse.getJobId();
+        GetImportProgressResponse getImportProgressResponse = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, jobId, CloudImportConsts.CLUSTER_ID);
+        System.out.println(new Gson().toJson(getImportProgressResponse));
+
+        System.out.println("\n===================== list import jobs ====================");
+        ListImportJobsResponse listImportJobsResponse = CloudImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, CloudImportConsts.CLUSTER_ID, 10, 1);
+        System.out.println(new Gson().toJson(listImportJobsResponse));
+    }
+
+    private CollectionSchemaParam buildSimpleCollection() {
+        FieldType fieldType1 = FieldType.newBuilder()
+                .withName("id")
+                .withDataType(DataType.Int64)
+                .withPrimaryKey(true)
+                .withAutoID(true)
+                .build();
+
+        // vector field
+        FieldType fieldType2 = FieldType.newBuilder()
+                .withName("vector")
+                .withDataType(DataType.FloatVector)
+                .withDimension(DIM)
+                .build();
+
+        // scalar field
+        FieldType fieldType3 = FieldType.newBuilder()
+                .withName("path")
+                .withDataType(DataType.VarChar)
+                .withMaxLength(512)
+                .build();
+
+        FieldType fieldType4 = FieldType.newBuilder()
+                .withName("label")
+                .withDataType(DataType.VarChar)
+                .withMaxLength(512)
+                .build();
+
+        CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
+                .addFieldType(fieldType1)
+                .addFieldType(fieldType2)
+                .addFieldType(fieldType3)
+                .addFieldType(fieldType4)
+                .build();
+
+        return collectionSchema;
+    }
+
+    private static CollectionSchemaParam buildAllTypeSchema(boolean binVec, boolean hasArray) {
+        // scalar field
+        FieldType fieldType1 = FieldType.newBuilder()
+                .withName("id")
+                .withDataType(DataType.Int64)
+                .withPrimaryKey(true)
+                .withAutoID(false)
+                .build();
+
+        FieldType fieldType2 = FieldType.newBuilder()
+                .withName("bool")
+                .withDataType(DataType.Bool)
+                .build();
+
+        FieldType fieldType3 = FieldType.newBuilder()
+                .withName("int8")
+                .withDataType(DataType.Int8)
+                .build();
+
+        FieldType fieldType4 = FieldType.newBuilder()
+                .withName("int16")
+                .withDataType(DataType.Int16)
+                .build();
+
+        FieldType fieldType5 = FieldType.newBuilder()
+                .withName("int32")
+                .withDataType(DataType.Int32)
+                .build();
+
+        FieldType fieldType6 = FieldType.newBuilder()
+                .withName("float")
+                .withDataType(DataType.Float)
+                .build();
+
+        FieldType fieldType7 = FieldType.newBuilder()
+                .withName("double")
+                .withDataType(DataType.Double)
+                .build();
+
+        FieldType fieldType8 = FieldType.newBuilder()
+                .withName("varchar")
+                .withDataType(DataType.VarChar)
+                .withMaxLength(512)
+                .build();
+
+        FieldType fieldType9 = FieldType.newBuilder()
+                .withName("json")
+                .withDataType(DataType.JSON)
+                .build();
+
+        // vector field
+        FieldType fieldType10;
+        if (binVec) {
+            fieldType10 = FieldType.newBuilder()
+                    .withName("vector")
+                    .withDataType(DataType.BinaryVector)
+                    .withDimension(128)
+                    .build();
+        } else {
+            fieldType10 = FieldType.newBuilder()
+                    .withName("vector")
+                    .withDataType(DataType.FloatVector)
+                    .withDimension(128)
+                    .build();
+        }
+
+        CollectionSchemaParam.Builder schemaBuilder = CollectionSchemaParam.newBuilder()
+                .withEnableDynamicField(false)
+                .addFieldType(fieldType1)
+                .addFieldType(fieldType2)
+                .addFieldType(fieldType3)
+                .addFieldType(fieldType4)
+                .addFieldType(fieldType5)
+                .addFieldType(fieldType6)
+                .addFieldType(fieldType7)
+                .addFieldType(fieldType8)
+                .addFieldType(fieldType9)
+                .addFieldType(fieldType10);
+
+        // array field
+        if (hasArray) {
+            FieldType fieldType11 = FieldType.newBuilder()
+                    .withName("arrayInt64")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.Int64)
+                    .withMaxCapacity(10)
+                    .build();
+
+            FieldType fieldType12 = FieldType.newBuilder()
+                    .withName("arrayVarchar")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.VarChar)
+                    .withMaxLength(10)
+                    .withMaxCapacity(10)
+                    .build();
+
+            FieldType fieldType13 = FieldType.newBuilder()
+                    .withName("arrayInt8")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.Int8)
+                    .withMaxCapacity(10)
+                    .build();
+
+            FieldType fieldType14 = FieldType.newBuilder()
+                    .withName("arrayInt16")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.Int16)
+                    .withMaxCapacity(10)
+                    .build();
+
+            FieldType fieldType15 = FieldType.newBuilder()
+                    .withName("arrayInt32")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.Int32)
+                    .withMaxCapacity(10)
+                    .build();
+
+            FieldType fieldType16 = FieldType.newBuilder()
+                    .withName("arrayFloat")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.Float)
+                    .withMaxCapacity(10)
+                    .build();
+
+            FieldType fieldType17 = FieldType.newBuilder()
+                    .withName("arrayDouble")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.Double)
+                    .withMaxCapacity(10)
+                    .build();
+
+            FieldType fieldType18 = FieldType.newBuilder()
+                    .withName("arrayBool")
+                    .withDataType(DataType.Array)
+                    .withElementType(DataType.Bool)
+                    .withMaxCapacity(10)
+                    .build();
+
+            schemaBuilder.addFieldType(fieldType11)
+                    .addFieldType(fieldType12)
+                    .addFieldType(fieldType13)
+                    .addFieldType(fieldType14)
+                    .addFieldType(fieldType15)
+                    .addFieldType(fieldType16)
+                    .addFieldType(fieldType17)
+                    .addFieldType(fieldType18);
+        }
+        return schemaBuilder.build();
+    }
+
+    private void checkMilvusClientIfExist() {
+        if (milvusClient == null) {
+            String msg = "milvusClient is null. Please initialize it by calling createConnection() first before use.";
+            throw new RuntimeException(msg);
+        }
+    }
+}

File diff suppressed because it is too large
+ 1 - 0
examples/main/resources/data/train_embeddings.csv


+ 103 - 12
pom.xml

@@ -74,7 +74,7 @@
         <versio.maven.source.plugin>3.2.1</versio.maven.source.plugin>
         <javax.annotation.version>1.2</javax.annotation.version>
         <commons.text.version>1.10.0</commons.text.version>
-        <slf4j.api.version>1.7.30</slf4j.api.version>
+        <slf4j.api.version>1.7.36</slf4j.api.version>
         <log4j.slf4j.version>2.17.1</log4j.slf4j.version>
         <junit.version>4.13.2</junit.version>
         <junit.jupiter.version>5.10.1</junit.jupiter.version>
@@ -90,12 +90,22 @@
         <maven.surefire.plugin.version>2.19.1</maven.surefire.plugin.version>
         <junit.platform.version>1.1.0</junit.platform.version>
         <junit.jupiter.engine.version>5.10.1</junit.jupiter.engine.version>
-        <jackson.version>2.12.7.1</jackson.version>
+        <jackson.version>2.16.1</jackson.version>
         <gson.version>2.10.1</gson.version>
         <kotlin.version>1.6.20</kotlin.version>
         <version.fastjson>1.2.83</version.fastjson>
         <mockito.version>5.8.0</mockito.version>
         <testcontainers.version>1.19.6</testcontainers.version>
+
+        <hadoop.version>2.6.0</hadoop.version>
+        <hbase.version>1.2.0</hbase.version>
+        <parquet.version>1.13.1</parquet.version>
+        <unirest.version>3.13.10</unirest.version>
+        <!--storage sdk-->
+        <aws-java-sdk-s3.version>1.12.312</aws-java-sdk-s3.version>
+        <minio-java-sdk.veresion>8.2.1</minio-java-sdk.veresion>
+        <azure-java-blob-sdk.version>12.24.0</azure-java-blob-sdk.version>
+        <azure-java-identity-sdk.version>1.10.1</azure-java-identity-sdk.version>
     </properties>
 
     <dependencyManagement>
@@ -107,6 +117,26 @@
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>${slf4j.api.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-annotations</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -164,16 +194,6 @@
             <artifactId>commons-collections4</artifactId>
             <version>${commons-collections4.version}</version>
         </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <version>${jackson.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>${slf4j.api.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-slf4j-impl</artifactId>
@@ -242,6 +262,77 @@
             <artifactId>fastjson</artifactId>
             <version>${version.fastjson}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.konghq</groupId>
+            <artifactId>unirest-java</artifactId>
+            <version>${unirest.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-csv</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <!-- storage sdk-->
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+            <version>${aws-java-sdk-s3.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-logging</artifactId>
+                    <groupId>commons-logging</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-sts</artifactId>
+            <version>${aws-java-sdk-s3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.minio</groupId>
+            <artifactId>minio</artifactId>
+            <version>${minio-java-sdk.veresion}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-blob</artifactId>
+            <version>${azure-java-blob-sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-identity</artifactId>
+            <version>${azure-java-identity-sdk.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jna-platform</artifactId>
+                    <groupId>net.java.dev.jna</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <profiles>

+ 292 - 0
src/main/java/io/milvus/bulkwriter/Buffer.java

@@ -0,0 +1,292 @@
+package io.milvus.bulkwriter;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.common.utils.ExceptionUtils;
+import io.milvus.bulkwriter.common.utils.ParquetUtils;
+import io.milvus.grpc.DataType;
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.FieldType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
+
+public class Buffer {
+    private static final Logger logger = LoggerFactory.getLogger(Buffer.class);
+
+    private CollectionSchemaParam collectionSchema;
+    private BulkFileType fileType;
+    private Map<String, List<Object>> buffer;
+    private Map<String, FieldType> fields;
+
+    public Buffer(CollectionSchemaParam collectionSchema, BulkFileType fileType) {
+        this.collectionSchema = collectionSchema;
+        this.fileType = fileType;
+
+        buffer = new HashMap<>();
+        fields = new HashMap<>();
+
+        for (FieldType fieldType : collectionSchema.getFieldTypes()) {
+            if (fieldType.isPrimaryKey() && fieldType.isAutoID())
+                continue;
+            buffer.put(fieldType.getName(), Lists.newArrayList());
+            fields.put(fieldType.getName(), fieldType);
+        }
+
+        if (buffer.isEmpty()) {
+            ExceptionUtils.throwUnExpectedException("Illegal collection schema: fields list is empty");
+        }
+
+        if (collectionSchema.isEnableDynamicField()) {
+            buffer.put(DYNAMIC_FIELD_NAME, Lists.newArrayList());
+            fields.put(DYNAMIC_FIELD_NAME, FieldType.newBuilder().withName(DYNAMIC_FIELD_NAME).withDataType(DataType.JSON).build());
+        }
+    }
+
+    public Integer getRowCount() {
+        if (buffer.isEmpty()) {
+            return 0;
+        }
+
+        for (String fieldName : buffer.keySet()) {
+            return buffer.get(fieldName).size();
+        }
+        return null;
+    }
+
+    public void appendRow(JSONObject row) {
+        Map<String, Object> dynamicValues = new HashMap<>();
+        if (row.containsKey(DYNAMIC_FIELD_NAME) && !(row.get(DYNAMIC_FIELD_NAME) instanceof Map)) {
+            String msg = String.format("Dynamic field '%s' value should be JSON format", DYNAMIC_FIELD_NAME);
+            ExceptionUtils.throwUnExpectedException(msg);
+        }
+
+        for (String key : row.keySet()) {
+            if (DYNAMIC_FIELD_NAME.equals(key)) {
+                dynamicValues.putAll((Map<String, Object>) row.get(key));
+                continue;
+            }
+            if (!buffer.containsKey(key)) {
+                dynamicValues.put(key, row.get(key));
+            } else {
+                buffer.get(key).add(row.get(key));
+            }
+        }
+
+        if (buffer.containsKey(DYNAMIC_FIELD_NAME)) {
+            buffer.get(DYNAMIC_FIELD_NAME).add(dynamicValues);
+        }
+    }
+
+    // verify row count of fields are equal
+    public List<String> persist(String localPath, Integer bufferSize, Integer bufferRowCount) {
+        int rowCount = -1;
+        for (String key : buffer.keySet()) {
+            if (rowCount < 0) {
+                rowCount = buffer.get(key).size();
+            } else if (rowCount != buffer.get(key).size()) {
+                String msg = String.format("Column `%s` row count %s doesn't equal to the first column row count %s", key, buffer.get(key).size(), rowCount);
+                ExceptionUtils.throwUnExpectedException(msg);
+            }
+        }
+
+        // output files
+        if (fileType == BulkFileType.PARQUET) {
+            return persistParquet(localPath, bufferSize, bufferRowCount);
+        }
+        ExceptionUtils.throwUnExpectedException("Unsupported file type: " + fileType);
+        return null;
+    }
+
+    private List<String> persistParquet(String localPath, Integer bufferSize, Integer bufferRowCount) {
+        String filePath = localPath + ".parquet";
+
+        // calculate a proper row group size
+        int rowGroupSizeMin = 1000;
+        int rowGroupSizeMax = 1000000;
+        int rowGroupSize = 10000;
+
+        // 32MB is an experience value that avoid high memory usage of parquet reader on server-side
+        int rowGroupBytes = 32 * 1024 * 1024;
+
+        int sizePerRow = (bufferSize / bufferRowCount) + 1;
+        rowGroupSize = rowGroupBytes / sizePerRow;
+        rowGroupSize = Math.max(rowGroupSizeMin, Math.min(rowGroupSizeMax, rowGroupSize));
+
+        // declare the messageType of the Parquet
+        MessageType messageType = ParquetUtils.parseCollectionSchema(collectionSchema);
+
+        // declare and define the ParquetWriter.
+        Path path = new Path(filePath);
+        Configuration configuration = new Configuration();
+        GroupWriteSupport.setSchema(messageType, configuration);
+        GroupWriteSupport writeSupport = new GroupWriteSupport();
+
+        try (ParquetWriter<Group> writer = new ParquetWriter<>(path,
+                ParquetFileWriter.Mode.CREATE,
+                writeSupport,
+                CompressionCodecName.UNCOMPRESSED,
+                rowGroupBytes,
+                5 * 1024 * 1024,
+                5 * 1024 * 1024,
+                ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
+                ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
+                ParquetWriter.DEFAULT_WRITER_VERSION,
+                configuration)) {
+
+            Map<String, FieldType> nameFieldType = collectionSchema.getFieldTypes().stream().collect(Collectors.toMap(FieldType::getName, e -> e));
+
+            List<String> fieldNameList = Lists.newArrayList(buffer.keySet());
+            int size = buffer.get(fieldNameList.get(0)).size();
+            for (int i = 0; i < size; ++i) {
+                // build Parquet data and encapsulate it into a group.
+                Group group = new SimpleGroupFactory(messageType).newGroup();
+                for (String fieldName : fieldNameList) {
+                    appendGroup(group, fieldName, buffer.get(fieldName).get(i), nameFieldType.get(fieldName));
+                }
+                writer.write(group);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        String msg = String.format("Successfully persist file %s, total size: %s, row count: %s, row group size: %s",
+                filePath, bufferSize, bufferRowCount, rowGroupSize);
+        logger.info(msg);
+        return Lists.newArrayList(filePath);
+    }
+
+    private void appendGroup(Group group, String paramName, Object value, FieldType fieldType) {
+        switch (fieldType.getDataType()) {
+            case Int8:
+            case Int16:
+            case Int32:
+                group.append(paramName, Integer.parseInt(value.toString()));
+                break;
+            case Int64:
+                group.append(paramName, Long.parseLong(value.toString()));
+                break;
+            case Float:
+                group.append(paramName, Float.parseFloat(value.toString()));
+                break;
+            case Double:
+                group.append(paramName, Double.parseDouble(value.toString()));
+                break;
+            case Bool:
+                group.append(paramName, Boolean.parseBoolean(value.toString()));
+                break;
+            case VarChar:
+            case String:
+                group.append(paramName, String.valueOf(value));
+                break;
+            case JSON:
+                group.append(paramName, ((JSONObject) value).toJSONString());
+                break;
+            case FloatVector:
+                addFloatArray(group, paramName, (List<Float>) value);
+                break;
+            case BinaryVector:
+                addBinaryVector(group, paramName, (ByteBuffer) value);
+                break;
+            case Array:
+                switch (fieldType.getElementType()) {
+                    case Int8:
+                    case Int16:
+                    case Int32:
+                        addIntArray(group, paramName, (List<Integer>) value);
+                        break;
+                    case Int64:
+                        addLongArray(group, paramName, (List<Long>) value);
+                        break;
+                    case Float:
+                        addFloatArray(group, paramName, (List<Float>) value);
+                        break;
+                    case Double:
+                        addDoubleArray(group, paramName, (List<Double>) value);
+                        break;
+                    case String:
+                    case VarChar:
+                        addStringArray(group, paramName, (List<String>) value);
+                        break;
+                    case Bool:
+                        addBooleanArray(group, paramName, (List<Boolean>) value);
+                        break;
+                }
+        }
+    }
+
+    private static void addLongArray(Group group, String fieldName, List<Long> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (long value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addStringArray(Group group, String fieldName, List<String> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (String value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addIntArray(Group group, String fieldName, List<Integer> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (int value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addFloatArray(Group group, String fieldName, List<Float> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (float value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addBinaryVector(Group group, String fieldName, ByteBuffer byteBuffer) {
+        Group arrayGroup = group.addGroup(fieldName);
+        byte[] bytes = byteBuffer.array();
+        for (byte value : bytes) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addDoubleArray(Group group, String fieldName, List<Double> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (double value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addBooleanArray(Group group, String fieldName, List<Boolean> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (boolean value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+}

+ 207 - 0
src/main/java/io/milvus/bulkwriter/BulkWriter.java

@@ -0,0 +1,207 @@
+package io.milvus.bulkwriter;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.bulkwriter.common.clientenum.TypeSize;
+import io.milvus.common.utils.ExceptionUtils;
+import io.milvus.grpc.DataType;
+import io.milvus.param.ParamUtils;
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.FieldType;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class BulkWriter {
+    private static final Logger logger = LoggerFactory.getLogger(BulkWriter.class);
+    protected CollectionSchemaParam collectionSchema;
+    protected int chunkSize;
+    protected BulkFileType fileType;
+
+    protected int bufferSize;
+    protected int bufferRowCount;
+    protected int totalRowCount;
+    protected Buffer buffer;
+    protected ReentrantLock bufferLock;
+
+    protected BulkWriter(CollectionSchemaParam collectionSchema, int chunkSize, BulkFileType fileType) {
+        this.collectionSchema = collectionSchema;
+        this.chunkSize = chunkSize;
+        this.fileType = fileType;
+
+        if (CollectionUtils.isEmpty(collectionSchema.getFieldTypes())) {
+            ExceptionUtils.throwUnExpectedException("collection schema fields list is empty");
+        }
+
+        if (!hasPrimaryField(collectionSchema.getFieldTypes())) {
+            ExceptionUtils.throwUnExpectedException("primary field is null");
+        }
+        bufferLock = new ReentrantLock();
+        buffer = null;
+        this.newBuffer();
+    }
+
+    protected Integer getBufferSize() {
+        return bufferSize;
+    }
+
+    public Integer getBufferRowCount() {
+        return bufferRowCount;
+    }
+
+    public Integer getTotalRowCount() {
+        return totalRowCount;
+    }
+
+    protected Integer getChunkSize() {
+        return chunkSize;
+    }
+
+    protected Buffer newBuffer() {
+        Buffer oldBuffer = buffer;
+
+        bufferLock.lock();
+        this.buffer = new Buffer(collectionSchema, fileType);
+        bufferLock.unlock();
+
+        return oldBuffer;
+    }
+
+    public void appendRow(JSONObject row) throws IOException, InterruptedException {
+        verifyRow(row);
+
+        bufferLock.lock();
+        buffer.appendRow(row);
+        bufferLock.unlock();
+    }
+
+    protected void commit(boolean async) throws InterruptedException {
+        bufferLock.lock();
+        bufferSize = 0;
+        bufferRowCount = 0;
+        bufferLock.unlock();
+    }
+
+    protected String getDataPath() {
+        return "";
+    }
+
+    private void verifyRow(JSONObject row) {
+        int rowSize = 0;
+        for (FieldType fieldType : collectionSchema.getFieldTypes()) {
+            if (fieldType.isPrimaryKey() && fieldType.isAutoID()) {
+                if (row.containsKey(fieldType.getName())) {
+                    String msg = String.format("The primary key field '%s' is auto-id, no need to provide", fieldType.getName());
+                    ExceptionUtils.throwUnExpectedException(msg);
+                } else {
+                    continue;
+                }
+            }
+
+            if (!row.containsKey(fieldType.getName())) {
+                String msg = String.format("The field '%s' is missed in the row", fieldType.getName());
+                ExceptionUtils.throwUnExpectedException(msg);
+            }
+
+            switch (fieldType.getDataType()) {
+                case BinaryVector:
+                case FloatVector:
+                    rowSize += verifyVector(row.get(fieldType.getName()), fieldType);
+                    break;
+                case VarChar:
+                    rowSize += verifyVarchar(row.get(fieldType.getName()), fieldType, false);
+                    break;
+                case JSON:
+                    Pair<Object, Integer> objectRowSize = verifyJSON(row.get(fieldType.getName()), fieldType);
+                    row.put(fieldType.getName(), objectRowSize.getLeft());
+                    rowSize += objectRowSize.getRight();
+                    break;
+                case Array:
+                    rowSize += verifyArray(row.get(fieldType.getName()), fieldType);
+                    break;
+                default:
+                    rowSize += TypeSize.getSize(fieldType.getDataType());
+            }
+        }
+
+        bufferLock.lock();
+        bufferSize += rowSize;
+        bufferRowCount += 1;
+        totalRowCount += 1;
+        bufferLock.unlock();
+    }
+
+    private Integer verifyVector(Object object, FieldType fieldType) {
+        if (fieldType.getDataType() == DataType.FloatVector) {
+            ParamUtils.checkFieldData(fieldType, Lists.newArrayList(object), false);
+            return ((List<?>)object).size() * 4;
+        } else {
+            ParamUtils.checkFieldData(fieldType, Lists.newArrayList(object), false);
+            return ((ByteBuffer)object).position();
+        }
+    }
+
+    private Integer verifyVarchar(Object object, FieldType fieldType, boolean verifyElementType) {
+        ParamUtils.checkFieldData(fieldType, Lists.newArrayList(object), verifyElementType);
+
+        return String.valueOf(object).length();
+    }
+
+    private Pair<Object, Integer> verifyJSON(Object object, FieldType fieldType) {
+        int size = 0;
+        if (object instanceof String) {
+            size = String.valueOf(object).length();
+            object = tryConvertJson(fieldType.getName(), object);
+        } else if (object instanceof JSONObject) {
+            size = ((JSONObject) object).toJSONString().length();
+        } else {
+            String msg = String.format("Illegal JSON value for field '%s', type mismatch", fieldType.getName());
+            ExceptionUtils.throwUnExpectedException(msg);
+        }
+        return Pair.of(object, size);
+    }
+
+    private Integer verifyArray(Object object, FieldType fieldType) {
+        ParamUtils.checkFieldData(fieldType, (List<?>)object, true);
+
+        int rowSize = 0;
+        DataType elementType = fieldType.getElementType();
+        if (TypeSize.contains(elementType)) {
+            rowSize = TypeSize.getSize(elementType) * ((List<?>)object).size();
+        } else if (elementType == DataType.VarChar) {
+            for (String ele : (List<String>) object) {
+                rowSize += verifyVarchar(ele, fieldType, true);
+            }
+        } else {
+            String msg = String.format("Unsupported element type for array field '%s'", fieldType.getName());
+            ExceptionUtils.throwUnExpectedException(msg);
+        }
+
+        return rowSize;
+    }
+
+    private Object tryConvertJson(String fieldName, Object object) {
+        if (object instanceof String) {
+            try {
+                return JSONObject.parseObject(String.valueOf(object));
+            } catch (Exception e) {
+                String msg = String.format("Illegal JSON value for field '%s', type mismatch or illegal format, error: %s", fieldName, e);
+                ExceptionUtils.throwUnExpectedException(msg);
+            }
+        }
+        return object;
+    }
+
+    private boolean hasPrimaryField(List<FieldType> fieldTypes) {
+        Optional<FieldType> primaryKeyField = fieldTypes.stream().filter(FieldType::isPrimaryKey).findFirst();
+        return primaryKeyField.isPresent();
+    }
+}

+ 136 - 0
src/main/java/io/milvus/bulkwriter/CloudImport.java

@@ -0,0 +1,136 @@
+package io.milvus.bulkwriter;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import io.milvus.bulkwriter.response.BulkImportResponse;
+import io.milvus.bulkwriter.response.GetImportProgressResponse;
+import io.milvus.bulkwriter.response.ListImportJobsResponse;
+import io.milvus.bulkwriter.response.RestfulResponse;
+import io.milvus.common.utils.ExceptionUtils;
+import kong.unirest.Unirest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CloudImport {
+    public static BulkImportResponse bulkImport(String url, String apiKey, String objectUrl,
+                                                String accessKey, String secretKey, String clusterId, String collectionName) throws MalformedURLException {
+        String requestURL;
+        String protocol = new URL(url).getProtocol();
+        if (protocol.startsWith("http")) {
+            requestURL = url + "/v1/vector/collections/import";
+        } else {
+            requestURL = String.format("https://%s/v1/vector/collections/import", url);
+        }
+
+        Map<String, Object> params = new HashMap<>();
+        params.put("objectUrl", objectUrl);
+        params.put("accessKey", accessKey);
+        params.put("secretKey", secretKey);
+        params.put("clusterId", clusterId);
+        params.put("collectionName", collectionName);
+
+        String body = postRequest(requestURL, apiKey, params, 60 * 1000);
+        RestfulResponse<BulkImportResponse> response = new Gson().fromJson(body, new TypeToken<RestfulResponse<BulkImportResponse>>(){}.getType());
+        handleResponse(url, response);
+        return response.getData();
+    }
+
+    public static GetImportProgressResponse getImportProgress(String url, String apiKey, String jobId, String clusterId) throws MalformedURLException {
+        String requestURL;
+        String protocol = new URL(url).getProtocol();
+        if (protocol.startsWith("http")) {
+            requestURL = url + "/v1/vector/collections/import/get";
+        } else {
+            requestURL = String.format("https://%s/v1/vector/collections/import/get", url);
+        }
+
+        Map<String, Object> params = new HashMap<>();
+        params.put("clusterId", clusterId);
+        params.put("jobId", jobId);
+
+        String body = getRequest(requestURL, apiKey, params, 60 * 1000);
+        RestfulResponse<GetImportProgressResponse> response = new Gson().fromJson(body, new TypeToken<RestfulResponse<GetImportProgressResponse>>(){}.getType());
+        handleResponse(url, response);
+        return response.getData();
+    }
+
+    public static ListImportJobsResponse listImportJobs(String url, String apiKey, String clusterId, int pageSize, int currentPage) throws MalformedURLException {
+        String requestURL;
+        String protocol = new URL(url).getProtocol();
+        if (protocol.startsWith("http")) {
+            requestURL = url + "/v1/vector/collections/import/list";
+        } else {
+            requestURL = String.format("https://%s/v1/vector/collections/import/list", url);
+        }
+
+        Map<String, Object> params = new HashMap<>();
+        params.put("clusterId", clusterId);
+        params.put("pageSize", pageSize);
+        params.put("currentPage", currentPage);
+
+        String body = getRequest(requestURL, apiKey, params, 60 * 1000);
+        RestfulResponse<ListImportJobsResponse> response = new Gson().fromJson(body, new TypeToken<RestfulResponse<ListImportJobsResponse>>(){}.getType());
+        handleResponse(url, response);
+        return response.getData();
+    }
+
+    private static String postRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
+        try {
+            kong.unirest.HttpResponse<String> response = Unirest.post(url)
+                    .connectTimeout(timeout)
+                    .headers(httpHeaders(apiKey))
+                    .body(params).asString();
+            if (response.getStatus() != 200) {
+                ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, status code: %s", url, response.getStatus()));
+            } else {
+                return response.getBody();
+            }
+        } catch (Exception e) {
+            ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, error: %s", url, e));
+        }
+        return null;
+    }
+
+    private static String getRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
+        try {
+            kong.unirest.HttpResponse<String> response = Unirest.get(url)
+                    .connectTimeout(timeout)
+                    .headers(httpHeaders(apiKey))
+                    .queryString(params).asString();
+            if (response.getStatus() != 200) {
+                ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, status code: %s", url, response.getStatus()));
+            } else {
+                return response.getBody();
+            }
+        } catch (Exception e) {
+            ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, error: %s", url, e));
+        }
+        return null;
+    }
+
+
+    private static Map<String, String> httpHeaders(String apiKey) {
+        Map<String, String> header = new HashMap<>();
+        header.put("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) " +
+                "Chrome/17.0.963.56 Safari/535.11");
+        header.put("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8");
+        header.put("Accept-Encodin", "gzip,deflate,sdch");
+        header.put("Accept-Languag", "en-US,en;q=0.5");
+        header.put("Authorization", "Bearer " + apiKey);
+
+        return header;
+    }
+
+    private static void handleResponse(String url, RestfulResponse res) {
+        int innerCode = res.getCode();
+        if (innerCode != 200) {
+            String innerMessage = res.getMessage();
+            ExceptionUtils.throwUnExpectedException(String.format("Failed to request url: %s, code: %s, message: %s", url, innerCode, innerMessage));
+        }
+    }
+}

+ 189 - 0
src/main/java/io/milvus/bulkwriter/LocalBulkWriter.java

@@ -0,0 +1,189 @@
+package io.milvus.bulkwriter;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.param.collection.CollectionSchemaParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LocalBulkWriter extends BulkWriter implements AutoCloseable {
+    private static final Logger logger = LoggerFactory.getLogger(LocalBulkWriter.class);
+    protected String localPath;
+    private String uuid;
+    private int flushCount;
+    private Map<String, Thread> workingThread;
+    private ReentrantLock workingThreadLock;
+    private List<List<String>> localFiles;
+
+    public LocalBulkWriter(LocalBulkWriterParam bulkWriterParam) throws IOException {
+        super(bulkWriterParam.getCollectionSchema(), bulkWriterParam.getChunkSize(), bulkWriterParam.getFileType());
+        this.localPath = bulkWriterParam.getLocalPath();
+        this.uuid = UUID.randomUUID().toString();
+        this.workingThreadLock = new ReentrantLock();
+        this.workingThread = new HashMap<>();
+        this.localFiles = Lists.newArrayList();
+        this.makeDir();
+    }
+
+    protected LocalBulkWriter(CollectionSchemaParam collectionSchema, int chunkSize, BulkFileType fileType, String localPath) throws IOException {
+        super(collectionSchema, chunkSize, fileType);
+        this.localPath = localPath;
+        this.uuid = UUID.randomUUID().toString();
+        this.workingThreadLock = new ReentrantLock();
+        this.workingThread = new HashMap<>();
+        this.localFiles = Lists.newArrayList();
+        this.makeDir();
+    }
+
+    public void appendRow(JSONObject rowData) throws IOException, InterruptedException {
+        super.appendRow(rowData);
+
+//        only one thread can enter this section to persist data,
+//        in the _flush() method, the buffer will be swapped to a new one.
+//        in async mode, the flush thread is asynchronously, other threads can
+//        continue to append if the new buffer size is less than target size
+        workingThreadLock.lock();
+        if (super.getBufferSize() > super.getChunkSize()) {
+            commit(true);
+        }
+        workingThreadLock.unlock();
+    }
+
+    public void commit(boolean async) throws InterruptedException {
+        // _async=True, the flush thread is asynchronously
+        while (workingThread.size() > 0) {
+            String msg = String.format("Previous flush action is not finished, %s is waiting...", Thread.currentThread().getName());
+            logger.info(msg);
+            TimeUnit.SECONDS.sleep(5);
+        }
+
+        String msg = String.format("Prepare to flush buffer, row_count: %s, size: %s", super.getBufferRowCount(), super.getBufferSize());
+        logger.info(msg);
+
+        int bufferRowCount = getBufferRowCount();
+        int bufferSize = getBufferSize();
+        Runnable runnable = () -> flush(bufferSize, bufferRowCount);
+        Thread thread = new Thread(runnable);
+        logger.info("Flush thread begin, name: {}", thread.getName());
+        workingThread.put(thread.getName(), thread);
+        thread.start();
+
+        if (!async) {
+            logger.info("Wait flush to finish");
+            thread.join();
+        }
+
+        // reset the buffer size
+        super.commit(false);
+        logger.info("Commit done with async={}", async);
+    }
+
+    private void flush(Integer bufferSize, Integer bufferRowCount) {
+        flushCount += 1;
+        java.nio.file.Path path = Paths.get(localPath);
+        java.nio.file.Path flushDirPath = path.resolve(String.valueOf(flushCount));
+
+        Buffer oldBuffer = super.newBuffer();
+        if (oldBuffer.getRowCount() > 0) {
+            List<String> fileList = oldBuffer.persist(
+                    flushDirPath.toString(), bufferSize, bufferRowCount
+            );
+            localFiles.add(fileList);
+            callBack(fileList);
+        }
+        workingThread.remove(Thread.currentThread().getName());
+        String msg = String.format("Flush thread done, name: %s", Thread.currentThread().getName());
+        logger.info(msg);
+    }
+
+    protected void callBack(List<String> fileList) {
+    }
+
+    @Override
+    protected String getDataPath() {
+        return localPath;
+    }
+
+    public List<List<String>> getBatchFiles() {
+        return localFiles;
+    }
+
+    private void makeDir() throws IOException {
+        java.nio.file.Path path = Paths.get(localPath);
+        createDirIfNotExist(path);
+
+        java.nio.file.Path fullPath = path.resolve(uuid);
+        createDirIfNotExist(fullPath);
+        this.localPath = fullPath.toString();
+    }
+
+    private void createDirIfNotExist(java.nio.file.Path path) throws IOException {
+        try {
+            Files.createDirectories(path);
+            logger.info("Data path created: {}", path);
+        } catch (IOException e) {
+            logger.error("Data Path create failed: {}", path);
+            throw e;
+        }
+    }
+
+    protected void exit() throws InterruptedException {
+        // if still has data in memory, default commit
+        workingThreadLock.lock();
+        if (getBufferSize() != null && getBufferSize() != 0) {
+            commit(true);
+        }
+        workingThreadLock.unlock();
+
+        // wait flush thread
+        if (workingThread.size() > 0) {
+            for (String key : workingThread.keySet()) {
+                logger.info("Wait flush thread '{}' to finish", key);
+                workingThread.get(key).join();
+            }
+        }
+        rmDir();
+    }
+
+    private void rmDir() {
+        try {
+            java.nio.file.Path path = Paths.get(localPath);
+            if (Files.exists(path) && isDirectoryEmpty(path)) {
+                Files.delete(path);
+                logger.info("Delete local directory {}", localPath);
+            }
+        } catch (IOException e) {
+            logger.error("Error while deleting directory: " + e.getMessage());
+        }
+    }
+
+    private boolean isDirectoryEmpty(java.nio.file.Path path) throws IOException {
+        try (DirectoryStream<java.nio.file.Path> dirStream = Files.newDirectoryStream(path)) {
+            return !dirStream.iterator().hasNext();
+        }
+    }
+
+    protected String getUUID() {
+        return uuid;
+    }
+
+    @Override
+    public void close() throws Exception {
+        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
+        exit();
+        logger.info(String.format("LocalBulkWriter done! output local files: %s", getBatchFiles()));
+    }
+}

+ 112 - 0
src/main/java/io/milvus/bulkwriter/LocalBulkWriterParam.java

@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter;
+
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.exception.ParamException;
+import io.milvus.param.ParamUtils;
+import io.milvus.param.collection.CollectionSchemaParam;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+
+/**
+ * Parameters for <code>bulkWriter</code> interface.
+ */
+@Getter
+@ToString
+public class LocalBulkWriterParam {
+    private final CollectionSchemaParam collectionSchema;
+    private final String localPath;
+    private final int chunkSize;
+    private final BulkFileType fileType;
+
+    private LocalBulkWriterParam(@NonNull Builder builder) {
+        this.collectionSchema = builder.collectionSchema;
+        this.localPath = builder.localPath;
+        this.chunkSize = builder.chunkSize;
+        this.fileType = builder.fileType;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link LocalBulkWriterParam} class.
+     */
+    public static final class Builder {
+        private CollectionSchemaParam collectionSchema;
+        private String localPath;
+        private int chunkSize = 128 * 1024 * 1024;
+        private BulkFileType fileType = BulkFileType.PARQUET;
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the collection info.
+         *
+         * @param collectionSchema collection info
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionSchema(@NonNull CollectionSchemaParam collectionSchema) {
+            this.collectionSchema = collectionSchema;
+            return this;
+        }
+
+        /**
+         * Sets the localPath.
+         *
+         * @param localPath collection name
+         * @return <code>Builder</code>
+         */
+        public Builder withLocalPath(@NonNull String localPath) {
+            this.localPath = localPath;
+            return this;
+        }
+
+        public Builder withChunkSize(int chunkSize) {
+            this.chunkSize = chunkSize;
+            return this;
+        }
+
+        public Builder withFileType(BulkFileType fileType) {
+            this.fileType = fileType;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link LocalBulkWriterParam} instance.
+         *
+         * @return {@link LocalBulkWriterParam}
+         */
+        public LocalBulkWriterParam build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(localPath, "localPath");
+
+            if (collectionSchema == null) {
+                throw new ParamException("collectionParam cannot be null");
+            }
+
+            return new LocalBulkWriterParam(this);
+        }
+    }
+
+}

+ 267 - 0
src/main/java/io/milvus/bulkwriter/RemoteBulkWriter.java

@@ -0,0 +1,267 @@
+package io.milvus.bulkwriter;
+
+import com.alibaba.fastjson.JSONObject;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.google.common.collect.Lists;
+import io.milvus.bulkwriter.connect.AzureConnectParam;
+import io.milvus.bulkwriter.connect.S3ConnectParam;
+import io.milvus.bulkwriter.connect.StorageConnectParam;
+import io.milvus.bulkwriter.storage.StorageClient;
+import io.milvus.bulkwriter.storage.client.AzureStorageClient;
+import io.milvus.bulkwriter.storage.client.MinioStorageClient;
+import io.milvus.common.utils.ExceptionUtils;
+import io.minio.errors.ErrorResponseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.FileVisitOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RemoteBulkWriter extends LocalBulkWriter {
+    private static final Logger logger = LoggerFactory.getLogger(RemoteBulkWriter.class);
+
+    private String remotePath;
+    private StorageConnectParam connectParam;
+    private StorageClient storageClient;
+
+    private List<List<String>> remoteFiles;
+
+    public RemoteBulkWriter(RemoteBulkWriterParam bulkWriterParam) throws IOException {
+        super(bulkWriterParam.getCollectionSchema(), bulkWriterParam.getChunkSize(), bulkWriterParam.getFileType(), generatorLocalPath());
+        Path path = Paths.get(bulkWriterParam.getRemotePath());
+        Path remoteDirPath = path.resolve(getUUID());
+        this.remotePath = remoteDirPath.toString();
+        this.connectParam = bulkWriterParam.getConnectParam();
+        getStorageClient();
+
+        this.remoteFiles = Lists.newArrayList();
+        logger.info("Remote buffer writer initialized, target path: {}", remotePath);
+
+    }
+
+    @Override
+    public void appendRow(JSONObject rowData) throws IOException, InterruptedException {
+        super.appendRow(rowData);
+    }
+
+    @Override
+    public void commit(boolean async) throws InterruptedException {
+        super.commit(async);
+    }
+
+    @Override
+    protected String getDataPath() {
+        return remotePath;
+    }
+
+    @Override
+    public List<List<String>> getBatchFiles() {
+        return remoteFiles;
+    }
+
+    @Override
+    protected void exit() throws InterruptedException {
+        super.exit();
+        // remove the temp folder "bulk_writer"
+        Path parentPath = Paths.get(localPath).getParent();
+        if (parentPath.toFile().exists() && isEmptyDirectory(parentPath)) {
+            try {
+                Files.delete(parentPath);
+                logger.info("Delete empty directory: " + parentPath);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static boolean isEmptyDirectory(Path directory) {
+        try {
+            return !Files.walk(directory, 1, FileVisitOption.FOLLOW_LINKS)
+                    .skip(1) // Skip the root directory itself
+                    .findFirst()
+                    .isPresent();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    private void getStorageClient() {
+        if (storageClient != null) {
+            return;
+        }
+
+        if (connectParam instanceof S3ConnectParam) {
+            S3ConnectParam s3ConnectParam = (S3ConnectParam) connectParam;
+            storageClient = MinioStorageClient.getStorageClient(s3ConnectParam.getEndpoint(),
+                    s3ConnectParam.getAccessKey(),
+                    s3ConnectParam.getSecretKey(),
+                    s3ConnectParam.getSessionToken(),
+                    s3ConnectParam.getRegion(),
+                    s3ConnectParam.getHttpClient());
+        } else if (connectParam instanceof AzureConnectParam) {
+            AzureConnectParam azureConnectParam = (AzureConnectParam) connectParam;
+            storageClient = AzureStorageClient.getStorageClient(azureConnectParam.getConnStr(),
+                    azureConnectParam.getAccountUrl(),
+                    azureConnectParam.getCredential());
+        }
+    }
+
+    private void rmLocal(String file) {
+        try {
+            Path filePath = Paths.get(file);
+            filePath.toFile().delete();
+
+            Path parentDir = filePath.getParent();
+            if (parentDir != null && !parentDir.toString().equals(localPath)) {
+                try {
+                    Files.delete(parentDir);
+                    logger.info("Delete empty directory: " + parentDir);
+                } catch (IOException ex) {
+                    logger.warn("Failed to delete empty directory: " + parentDir);
+                }
+            }
+        } catch (Exception ex) {
+            logger.warn("Failed to delete local file: " + file);
+        }
+    }
+
+    @Override
+    protected void callBack(List<String> fileList) {
+        List<String> remoteFileList = new ArrayList<>();
+        try {
+            if (!bucketExists()) {
+                ExceptionUtils.throwUnExpectedException("Blob storage bucket/container doesn't exist");
+            }
+
+            for (String filePath : fileList) {
+                String ext = getExtension(filePath);
+                if (!Lists.newArrayList(".parquet").contains(ext)) {
+                    continue;
+                }
+
+                String relativeFilePath = filePath.replace(super.getDataPath(), "");
+                String minioFilePath = getMinioFilePath(remotePath, relativeFilePath);
+
+                if (objectExists(minioFilePath)) {
+                    logger.info(String.format("Remote file %s already exists, will overwrite it", minioFilePath));
+                }
+                uploadObject(filePath, minioFilePath);
+                remoteFileList.add(minioFilePath);
+                rmLocal(filePath);
+            }
+
+        } catch (Exception e) {
+            ExceptionUtils.throwUnExpectedException(String.format("Failed to upload files, error: %s", e));
+        }
+
+        logger.info("Successfully upload files: " + fileList);
+        remoteFiles.add(remoteFileList);
+    }
+
+    @Override
+    public void close() throws Exception {
+        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
+        exit();
+        logger.info(String.format("RemoteBulkWriter done! output remote files: %s", getBatchFiles()));
+    }
+
+    private void getObjectEntity(String objectName) throws Exception {
+        if (connectParam instanceof S3ConnectParam) {
+            S3ConnectParam s3ConnectParam = (S3ConnectParam) connectParam;
+            storageClient.getObjectEntity(s3ConnectParam.getBucketName(), objectName);
+        } else if (connectParam instanceof AzureConnectParam) {
+            AzureConnectParam azureConnectParam = (AzureConnectParam) connectParam;
+            storageClient.getObjectEntity(azureConnectParam.getContainerName(), objectName);
+        }
+
+        ExceptionUtils.throwUnExpectedException("Blob storage client is not initialized");
+    }
+
+    private boolean objectExists(String objectName) throws Exception {
+        try {
+            getObjectEntity(objectName);
+        } catch (ErrorResponseException e) {
+            if ("NoSuchKey".equals(e.errorResponse().code())) {
+                return false;
+            }
+
+            String msg = String.format("Failed to stat MinIO/S3 object %s, error: %s", objectName, e.errorResponse().message());
+            ExceptionUtils.throwUnExpectedException(msg);
+        } catch (BlobStorageException e) {
+            if (BlobErrorCode.BLOB_NOT_FOUND == e.getErrorCode()) {
+                return false;
+            }
+            String msg = String.format("Failed to stat Azure object %s, error: %s", objectName, e.getServiceMessage());
+            ExceptionUtils.throwUnExpectedException(msg);
+        }
+        return true;
+    }
+
+    private boolean bucketExists() throws Exception {
+        if (connectParam instanceof S3ConnectParam) {
+            S3ConnectParam s3ConnectParam = (S3ConnectParam) connectParam;
+            return storageClient.checkBucketExist(s3ConnectParam.getBucketName());
+        } else if (connectParam instanceof AzureConnectParam) {
+            AzureConnectParam azureConnectParam = (AzureConnectParam) connectParam;
+            return storageClient.checkBucketExist(azureConnectParam.getContainerName());
+        }
+
+        ExceptionUtils.throwUnExpectedException("Blob storage client is not initialized");
+        return false;
+    }
+
+    private void uploadObject(String filePath, String objectName) throws Exception {
+        logger.info(String.format("Prepare to upload %s to %s", filePath, objectName));
+
+        File file = new File(filePath);
+        FileInputStream fileInputStream = new FileInputStream(file);
+        if (connectParam instanceof S3ConnectParam) {
+            S3ConnectParam s3ConnectParam = (S3ConnectParam) connectParam;
+            storageClient.putObjectStream(fileInputStream, file.length(), s3ConnectParam.getBucketName(), objectName);
+        } else if (connectParam instanceof AzureConnectParam) {
+            AzureConnectParam azureConnectParam = (AzureConnectParam) connectParam;
+            storageClient.putObjectStream(fileInputStream, file.length(), azureConnectParam.getContainerName(), objectName);
+        } else {
+            ExceptionUtils.throwUnExpectedException("Blob storage client is not initialized");
+        }
+        logger.info(String.format("Upload file %s to %s", filePath, objectName));
+    }
+
+
+    private static String generatorLocalPath() {
+        Path currentWorkingDirectory = Paths.get("").toAbsolutePath();
+        Path currentScriptPath = currentWorkingDirectory.resolve("bulk_writer");
+        return currentScriptPath.toString();
+    }
+
+    private static String getMinioFilePath(String remotePath, String relativeFilePath) {
+        remotePath = remotePath.startsWith("/") ? remotePath.substring(1) : remotePath;
+        Path remote = Paths.get(remotePath);
+
+        relativeFilePath = relativeFilePath.startsWith("/") ? relativeFilePath.substring(1) : relativeFilePath;
+        Path relative = Paths.get(relativeFilePath);
+        Path joinedPath = remote.resolve(relative);
+        return joinedPath.toString();
+    }
+
+    private static String getExtension(String filePath) {
+        Path path = Paths.get(filePath);
+        String fileName = path.getFileName().toString();
+        int dotIndex = fileName.lastIndexOf('.');
+
+        if (dotIndex == -1 || dotIndex == fileName.length() - 1) {
+            return "";
+        } else {
+            return fileName.substring(dotIndex);
+        }
+    }
+}

+ 126 - 0
src/main/java/io/milvus/bulkwriter/RemoteBulkWriterParam.java

@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.bulkwriter;
+
+import io.milvus.bulkwriter.connect.StorageConnectParam;
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.exception.ParamException;
+import io.milvus.param.ParamUtils;
+import io.milvus.param.collection.CollectionSchemaParam;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Parameters for <code>bulkWriter</code> interface.
+ */
+@Getter
+@ToString
+public class RemoteBulkWriterParam {
+    private final CollectionSchemaParam collectionSchema;
+    private final StorageConnectParam connectParam;
+    private final String remotePath;
+    private final int chunkSize;
+    private final BulkFileType fileType;
+
+    private RemoteBulkWriterParam(@NonNull Builder builder) {
+        this.collectionSchema = builder.collectionSchema;
+        this.connectParam = builder.connectParam;
+        this.remotePath = builder.remotePath;
+        this.chunkSize = builder.chunkSize;
+        this.fileType = builder.fileType;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link RemoteBulkWriterParam} class.
+     */
+    public static final class Builder {
+        private CollectionSchemaParam collectionSchema;
+        private StorageConnectParam connectParam;
+        private String remotePath;
+        private int chunkSize = 1024 * 1024 * 1024;
+        private BulkFileType fileType = BulkFileType.PARQUET;
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the collection info.
+         *
+         * @param collectionSchema collection info
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionSchema(@NonNull CollectionSchemaParam collectionSchema) {
+            this.collectionSchema = collectionSchema;
+            return this;
+        }
+
+        public Builder withConnectParam(@NotNull StorageConnectParam connectParam) {
+            this.connectParam = connectParam;
+            return this;
+        }
+
+        /**
+         * Sets the remotePath.
+         *
+         * @param remotePath remote path
+         * @return <code>Builder</code>
+         */
+        public Builder withRemotePath(@NonNull String remotePath) {
+            this.remotePath = remotePath;
+            return this;
+        }
+
+        public Builder withChunkSize(int chunkSize) {
+            this.chunkSize = chunkSize;
+            return this;
+        }
+
+        public Builder withFileType(@NonNull BulkFileType fileType) {
+            this.fileType = fileType;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link RemoteBulkWriterParam} instance.
+         *
+         * @return {@link RemoteBulkWriterParam}
+         */
+        public RemoteBulkWriterParam build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(remotePath, "localPath");
+
+            if (collectionSchema == null) {
+                throw new ParamException("collectionSchema cannot be null");
+            }
+
+            if (connectParam == null) {
+                throw new ParamException("connectParam cannot be null");
+            }
+
+            return new RemoteBulkWriterParam(this);
+        }
+    }
+
+}

+ 12 - 0
src/main/java/io/milvus/bulkwriter/common/clientenum/BulkFileType.java

@@ -0,0 +1,12 @@
+package io.milvus.bulkwriter.common.clientenum;
+
+public enum BulkFileType {
+    PARQUET(1),
+    ;
+
+    private Integer code;
+
+    BulkFileType(Integer code) {
+        this.code = code;
+    }
+}

+ 49 - 0
src/main/java/io/milvus/bulkwriter/common/clientenum/CloudStorage.java

@@ -0,0 +1,49 @@
+package io.milvus.bulkwriter.common.clientenum;
+
+import io.milvus.exception.ParamException;
+import org.apache.commons.lang3.StringUtils;
+
+public enum CloudStorage {
+    AWS("s3.amazonaws.com", null),
+    GCP("storage.googleapis.com", null),
+    AZURE("%s.blob.core.windows.net", "accountName"),
+    ALI("oss-%s.aliyuncs.com", "region"),
+    TC("cos.region.myqcloud.com", "region")
+    ;
+
+    private String endpoint;
+
+    private String replace;
+
+    CloudStorage(String endpoint, String replace) {
+        this.endpoint = endpoint;
+        this.replace = replace;
+    }
+
+    public String getEndpoint(String... replaceParams) {
+        if (StringUtils.isEmpty(replace))  {
+            return endpoint;
+        }
+        if (replaceParams.length == 0) {
+            throw new ParamException(String.format("Please input the replaceParams:%s when you want to get endpoint of %s", replace, this.name()));
+        }
+        return String.format(endpoint, replaceParams[0]);
+    }
+
+    public String getObjectUrl(String bucketName, String commonPrefix, String region) {
+        switch (this) {
+            case AWS:
+                return String.format("https://s3.%s.amazonaws.com/%s/%s", region, bucketName, commonPrefix);
+            case GCP:
+                return String.format("https://storage.cloud.google.com/%s/%s", bucketName, commonPrefix);
+            case AZURE:
+                return String.format("https://zillizvdctest.blob.core.windows.net/%s/%s", bucketName, commonPrefix);
+            case TC:
+                return String.format("https://%s.cos.%s.myqcloud.com/%s", bucketName, region, commonPrefix);
+            case ALI:
+                return String.format("https://%s.oss-%s.aliyuncs.com/%s", bucketName, region, commonPrefix);
+            default:
+                throw new ParamException("no support others storage address");
+        }
+    }
+}

+ 42 - 0
src/main/java/io/milvus/bulkwriter/common/clientenum/TypeSize.java

@@ -0,0 +1,42 @@
+package io.milvus.bulkwriter.common.clientenum;
+
+import io.milvus.exception.ParamException;
+import io.milvus.grpc.DataType;
+
+public enum TypeSize {
+    BOOL(DataType.Bool, 1),
+    INT8(DataType.Int8, 1),
+    INT16(DataType.Int16, 2),
+    INT32(DataType.Int32, 4),
+    INT64(DataType.Int64, 8),
+    FLOAT(DataType.Float, 4),
+    DOUBLE(DataType.Double, 8),
+
+    ;
+    private DataType dataType;
+    private Integer size;
+
+    TypeSize(DataType dataType, Integer size) {
+        this.dataType = dataType;
+        this.size = size;
+    }
+
+    public static boolean contains(DataType dataType) {
+        for (TypeSize typeSize : values()) {
+            if (typeSize.dataType == dataType) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static Integer getSize(DataType dataType) {
+        for (TypeSize typeSize : values()) {
+            if (typeSize.dataType == dataType) {
+                return typeSize.size;
+            }
+        }
+        throw new ParamException("TypeSize not contains this dataType: " + dataType);
+    }
+
+}

+ 132 - 0
src/main/java/io/milvus/bulkwriter/common/utils/GeneratorUtils.java

@@ -0,0 +1,132 @@
+package io.milvus.bulkwriter.common.utils;
+
+import org.apache.commons.lang3.RandomUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+public class GeneratorUtils {
+
+    public static List<Long> generatorLongValue(int count) {
+        List<Long> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add((long) i);
+        }
+        return result;
+    }
+
+    public static List<Boolean> generatorBoolValue(int count) {
+        List<Boolean> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add(i % 5 == 0);
+        }
+        return result;
+    }
+
+    public static List<Integer> generatorInt8Value(int count) {
+        List<Integer> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add(i % 128);
+        }
+        return result;
+    }
+
+    public static List<Integer> generatorInt16Value(int count) {
+        List<Integer> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add(i % 1000);
+        }
+        return result;
+    }
+
+    public static List<Integer> generatorInt32Value(int count) {
+        List<Integer> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add(i % 100000);
+        }
+        return result;
+    }
+
+    public static List<Float> generatorFloatValue(int count) {
+        List<Float> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add( (float)i / 3);
+        }
+        return result;
+    }
+
+    public static List<Double> generatorDoubleValue(int count) {
+        List<Double> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add((double)i / 7);
+        }
+        return result;
+    }
+
+    public static List<String> generatorVarcharValue(int count, int maxLength) {
+        List<String> result = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            result.add(UUID.randomUUID().toString().substring(0, maxLength));
+        }
+        return result;
+    }
+
+    public static ByteBuffer generatorBinaryVector(int dim) {
+        int[] rawVector = generateRandomBinaryVector(dim);
+        return packBits(rawVector);
+    }
+
+    private static int[] generateRandomBinaryVector(int dim) {
+        int[] rawVector = new int[dim];
+        Random random = new Random();
+
+        for (int i = 0; i < dim; i++) {
+            rawVector[i] = random.nextInt(2); // 生成随机的 0 或 1
+        }
+
+        return rawVector;
+    }
+
+    private static ByteBuffer packBits(int[] rawVector) {
+        int byteCount = (int) Math.ceil((double) rawVector.length / 8);
+        byte[] binaryArray = new byte[byteCount];
+
+        for (int i = 0; i < rawVector.length; i++) {
+            if (rawVector[i] != 0) {
+                int byteIndex = i / 8;
+                int bitIndex = 7 - (i % 8);
+                binaryArray[byteIndex] |= (1 << bitIndex);
+            }
+        }
+
+        ByteBuffer byteBuffer = ByteBuffer.allocate(byteCount);
+        for (byte b : binaryArray) {
+            byteBuffer.put(b);
+        }
+        return byteBuffer;
+    }
+
+    public static List<List<Float>> generatorFloatVector(int dim, int count) {
+        List<List<Float>> floatVector = new ArrayList<>();
+
+        for (int i = 0; i < count; ++i) {
+            List<Float> result = new ArrayList<>();
+            for (int j = 0; j < dim; ++j) {
+                result.add( (float)j / 3);
+            }
+            floatVector.add(result);
+        }
+        return floatVector;
+    }
+
+    public static List<Float> genFloatVector(int dim) {
+        List<Float> result = new ArrayList<>();
+        for (int i = 0; i < dim; ++i) {
+            result.add(RandomUtils.nextFloat(100, 10000));
+        }
+        return result;
+    }
+}

+ 36 - 0
src/main/java/io/milvus/bulkwriter/common/utils/ImportUtils.java

@@ -0,0 +1,36 @@
+package io.milvus.bulkwriter.common.utils;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ImportUtils {
+    public static String getCommonPrefix(List<List<String>> batchFiles) {
+        List<String> allFilePaths = batchFiles.stream().flatMap(Collection::stream).collect(Collectors.toList());
+        return longestCommonPrefix(allFilePaths);
+    }
+
+    private static String longestCommonPrefix(List<String> allFilePaths) {
+        if (allFilePaths.size() == 0) {
+            return "";
+        }
+        String prefix = allFilePaths.get(0);
+        int count = allFilePaths.size();
+        for (int i = 1; i < count; i++) {
+            prefix = longestCommonPrefix(prefix, allFilePaths.get(i));
+            if (prefix.length() == 0) {
+                break;
+            }
+        }
+        return prefix;
+    }
+
+    private static String longestCommonPrefix(String str1, String str2) {
+        int length = Math.min(str1.length(), str2.length());
+        int index = 0;
+        while (index < length && str1.charAt(index) == str2.charAt(index)) {
+            index++;
+        }
+        return str1.substring(0, index);
+    }
+}

+ 25 - 0
src/main/java/io/milvus/bulkwriter/common/utils/ParquetReaderUtils.java

@@ -0,0 +1,25 @@
+package io.milvus.bulkwriter.common.utils;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+
+import java.io.IOException;
+
+public abstract class ParquetReaderUtils {
+    public void readParquet(String localFilePath) throws IOException {
+        Path path = new Path(localFilePath);
+        try (org.apache.parquet.hadoop.ParquetReader<GenericData.Record> reader = AvroParquetReader
+                .<GenericData.Record>builder(path)
+                .withConf(new Configuration())
+                .build()) {
+            GenericData.Record record;
+            while ((record = reader.read()) != null) {
+                readRecord(record);
+            }
+        }
+    }
+
+    public abstract void readRecord(GenericData.Record record);
+}

+ 119 - 0
src/main/java/io/milvus/bulkwriter/common/utils/ParquetUtils.java

@@ -0,0 +1,119 @@
+package io.milvus.bulkwriter.common.utils;
+
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.FieldType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+import java.util.List;
+
+public class ParquetUtils {
+    public static MessageType parseCollectionSchema(CollectionSchemaParam collectionSchema) {
+        List<FieldType> fieldTypes = collectionSchema.getFieldTypes();
+        Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
+        for (FieldType fieldType : fieldTypes) {
+            if (fieldType.isAutoID()) {
+                continue;
+            }
+            switch (fieldType.getDataType()) {
+                case FloatVector:
+                    messageTypeBuilder.requiredList()
+                            .requiredElement(PrimitiveType.PrimitiveTypeName.FLOAT)
+                            .named(fieldType.getName());
+                    break;
+                case BinaryVector:
+                    messageTypeBuilder.requiredList()
+                            .requiredElement(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, false))
+                            .named(fieldType.getName());
+                    break;
+                case Array:
+                    fillArrayType(messageTypeBuilder, fieldType);
+                    break;
+
+                case Int64:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT64)
+                            .named(fieldType.getName());
+                    break;
+                case VarChar:
+                case JSON:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+                            .named(fieldType.getName());
+                    break;
+                case Int8:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, true))
+                            .named(fieldType.getName());
+                    break;
+                case Int16:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(16, true))
+                            .named(fieldType.getName());
+                    break;
+                case Int32:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT32)
+                            .named(fieldType.getName());
+                    break;
+                case Float:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.FLOAT)
+                            .named(fieldType.getName());
+                    break;
+                case Double:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+                            .named(fieldType.getName());
+                    break;
+                case Bool:
+                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BOOLEAN)
+                            .named(fieldType.getName());
+                    break;
+
+            }
+        }
+        return messageTypeBuilder.named("schema");
+    }
+
+    private static void fillArrayType(Types.MessageTypeBuilder messageTypeBuilder, FieldType fieldType) {
+        switch (fieldType.getElementType()) {
+            case Int64:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT64)
+                        .named(fieldType.getName());
+                break;
+            case VarChar:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+                        .named(fieldType.getName());
+                break;
+            case Int8:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, true))
+                        .named(fieldType.getName());
+                break;
+            case Int16:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(16, true))
+                        .named(fieldType.getName());
+                break;
+            case Int32:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT32)
+                        .named(fieldType.getName());
+                break;
+            case Float:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.FLOAT)
+                        .named(fieldType.getName());
+                break;
+            case Double:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.DOUBLE)
+                        .named(fieldType.getName());
+                break;
+            case Bool:
+                messageTypeBuilder.requiredList()
+                        .requiredElement(PrimitiveType.PrimitiveTypeName.BOOLEAN)
+                        .named(fieldType.getName());
+                break;
+
+        }
+    }
+}

+ 98 - 0
src/main/java/io/milvus/bulkwriter/connect/AzureConnectParam.java

@@ -0,0 +1,98 @@
+package io.milvus.bulkwriter.connect;
+
+import com.azure.core.credential.TokenCredential;
+import io.milvus.exception.ParamException;
+import io.milvus.param.ParamUtils;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+
+/**
+ * Parameters for <code>RemoteBulkWriter</code> interface.
+ */
+@Getter
+@ToString
+public class AzureConnectParam extends StorageConnectParam {
+    private final String containerName;
+    private final String connStr;
+    private final String accountUrl;
+    private final TokenCredential credential;
+
+    private AzureConnectParam(@NonNull Builder builder) {
+        this.containerName = builder.containerName;
+        this.connStr = builder.connStr;
+        this.accountUrl = builder.accountUrl;
+        this.credential = builder.credential;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link AzureConnectParam} class.
+     */
+    public static final class Builder {
+        private String containerName;
+        private String connStr;
+        private String accountUrl;
+        private TokenCredential credential;
+
+        private Builder() {
+        }
+
+        /**
+         * @param containerName The target container name
+         * @return <code>Builder</code>
+         */
+        public Builder withContainerName(@NonNull String containerName) {
+            this.containerName = containerName;
+            return this;
+        }
+
+        /**
+         * @param connStr A connection string to an Azure Storage account,
+         *                which can be parsed to an account_url and a credential.
+         *                To generate a connection string, read this link:
+         *                <a href="https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string">...</a>
+         * @return <code>Builder</code>
+         */
+        public Builder withConnStr(@NonNull String connStr) {
+            this.connStr = connStr;
+            return this;
+        }
+
+        /**
+         * @param accountUrl A string in format like https://<storage-account>.blob.core.windows.net
+         *                     Read this link for more info:
+         *                     <a href="https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview">...</a>
+         * @return <code>Builder</code>
+         */
+        public Builder withAccountUrl(@NonNull String accountUrl) {
+            this.accountUrl = accountUrl;
+            return this;
+        }
+
+        /**
+         *
+         * @param credential Account access key for the account, read this link for more info:
+         *                     <a href="https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal#view-account-access-keys">...</a>
+         * @return <code>Builder</code>
+         */
+        public Builder withCredential(@NonNull TokenCredential credential) {
+            this.credential = credential;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link AzureConnectParam} instance.
+         *
+         * @return {@link AzureConnectParam}
+         */
+        public AzureConnectParam build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(containerName, "containerName");
+
+            return new AzureConnectParam(this);
+        }
+    }
+}

+ 112 - 0
src/main/java/io/milvus/bulkwriter/connect/S3ConnectParam.java

@@ -0,0 +1,112 @@
+package io.milvus.bulkwriter.connect;
+
+import io.milvus.exception.ParamException;
+import io.milvus.param.ParamUtils;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import okhttp3.OkHttpClient;
+
+/**
+ * Parameters for <code>RemoteBulkWriter</code> interface.
+ */
+@Getter
+@ToString
+public class S3ConnectParam extends StorageConnectParam {
+    private final String bucketName;
+    private final String endpoint;
+    private final String accessKey;
+    private final String secretKey;
+    private final String sessionToken;
+    private final String region;
+    private final OkHttpClient httpClient;
+
+    private S3ConnectParam(@NonNull Builder builder) {
+        this.bucketName = builder.bucketName;
+        this.endpoint = builder.endpoint;
+        this.accessKey = builder.accessKey;
+        this.secretKey = builder.secretKey;
+        this.sessionToken = builder.sessionToken;
+        this.region = builder.region;
+        this.httpClient = builder.httpClient;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link S3ConnectParam} class.
+     */
+    public static final class Builder {
+        private String bucketName;
+        private String endpoint;
+        private String accessKey;
+        private String secretKey;
+        private String sessionToken;
+        private String region;
+        private OkHttpClient httpClient;
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the bucketName info.
+         *
+         * @param bucketName bucket info
+         * @return <code>Builder</code>
+         */
+        public Builder withBucketName(@NonNull String bucketName) {
+            this.bucketName = bucketName;
+            return this;
+        }
+
+        /**
+         * Sets the endpoint.
+         *
+         * @param endpoint endpoint info
+         * @return <code>Builder</code>
+         */
+        public Builder withEndpoint(@NonNull String endpoint) {
+            this.endpoint = endpoint;
+            return this;
+        }
+
+        public Builder withAccessKey(@NonNull String accessKey) {
+            this.accessKey = accessKey;
+            return this;
+        }
+
+        public Builder withSecretKey(@NonNull String secretKey) {
+            this.secretKey = secretKey;
+            return this;
+        }
+
+        public Builder withSessionToken(@NonNull String sessionToken) {
+            this.sessionToken = sessionToken;
+            return this;
+        }
+
+        public Builder withRegion(@NonNull String region) {
+            this.region = region;
+            return this;
+        }
+
+        public Builder withHttpClient(@NonNull OkHttpClient httpClient) {
+            this.httpClient = httpClient;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link S3ConnectParam} instance.
+         *
+         * @return {@link S3ConnectParam}
+         */
+        public S3ConnectParam build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(endpoint, "endpoint");
+            ParamUtils.CheckNullEmptyString(bucketName, "bucketName");
+
+            return new S3ConnectParam(this);
+        }
+    }
+}

+ 5 - 0
src/main/java/io/milvus/bulkwriter/connect/StorageConnectParam.java

@@ -0,0 +1,5 @@
+package io.milvus.bulkwriter.connect;
+
+public class StorageConnectParam {
+
+}

+ 18 - 0
src/main/java/io/milvus/bulkwriter/response/BulkImportResponse.java

@@ -0,0 +1,18 @@
+package io.milvus.bulkwriter.response;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class BulkImportResponse implements Serializable {
+    private static final long serialVersionUID = -7162743560382861611L;
+
+    private String jobId;
+}

+ 45 - 0
src/main/java/io/milvus/bulkwriter/response/GetImportProgressResponse.java

@@ -0,0 +1,45 @@
+package io.milvus.bulkwriter.response;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetImportProgressResponse implements Serializable {
+    private static final long serialVersionUID = -7162743560382861611L;
+
+    private String fileName;
+
+    private Integer fileSize;
+
+    private Double readyPercentage;
+
+    private String completeTime;
+
+    private String errorMessage;
+
+    private String collectionName;
+
+    private String jobId;
+
+    private List<Detail> details;
+
+    @Data
+    @Builder
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class Detail {
+        private String fileName;
+        private Integer fileSize;
+        private Double readyPercentage;
+        private String completeTime;
+        private String errorMessage;
+    }
+}

+ 35 - 0
src/main/java/io/milvus/bulkwriter/response/ListImportJobsResponse.java

@@ -0,0 +1,35 @@
+package io.milvus.bulkwriter.response;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class ListImportJobsResponse implements Serializable {
+    private static final long serialVersionUID = -7162743560382861611L;
+
+    private Integer count;
+
+    private Integer currentPage;
+
+    private Integer pageSize;
+
+    private List<Record> records;
+
+    @Data
+    @Builder
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class Record {
+        private String collectionName;
+        private String jobId;
+        private String state;
+    }
+}

+ 22 - 0
src/main/java/io/milvus/bulkwriter/response/RestfulResponse.java

@@ -0,0 +1,22 @@
+package io.milvus.bulkwriter.response;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class RestfulResponse<T> implements Serializable {
+    private static final long serialVersionUID = -7162743560382861611L;
+
+    private int code;
+
+    private String message;
+
+    private T data;
+}

+ 10 - 0
src/main/java/io/milvus/bulkwriter/storage/StorageClient.java

@@ -0,0 +1,10 @@
+package io.milvus.bulkwriter.storage;
+
+
+import java.io.InputStream;
+
+public interface StorageClient {
+    Long getObjectEntity(String bucketName, String objectKey) throws Exception;
+    boolean checkBucketExist(String bucketName) throws Exception;
+    void putObjectStream(InputStream inputStream, long contentLength, String bucketName, String objectKey) throws Exception;
+}

+ 60 - 0
src/main/java/io/milvus/bulkwriter/storage/client/AzureStorageClient.java

@@ -0,0 +1,60 @@
+package io.milvus.bulkwriter.storage.client;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import io.milvus.bulkwriter.storage.StorageClient;
+import io.milvus.common.utils.ExceptionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+public class AzureStorageClient implements StorageClient {
+    private static final Logger logger = LoggerFactory.getLogger(AzureStorageClient.class);
+
+    private final BlobServiceClient blobServiceClient;
+
+    private AzureStorageClient(BlobServiceClient blobServiceClient) {
+        this.blobServiceClient = blobServiceClient;
+    }
+
+    public static AzureStorageClient getStorageClient(String connStr,
+                                                      String accountUrl,
+                                                      TokenCredential credential) {
+        BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder();
+        if (credential != null) {
+            blobServiceClientBuilder.credential(credential);
+        }
+
+        if (StringUtils.isNotEmpty(connStr)) {
+            blobServiceClientBuilder.connectionString(connStr);
+        } else if (StringUtils.isNotEmpty(accountUrl)) {
+            blobServiceClientBuilder.endpoint(accountUrl);
+        } else {
+            ExceptionUtils.throwUnExpectedException("Illegal connection parameters");
+        }
+        BlobServiceClient blobServiceClient = blobServiceClientBuilder.buildClient();
+        logger.info("Azure blob storage client successfully initialized");
+        return new AzureStorageClient(blobServiceClient);
+    }
+
+    public Long getObjectEntity(String bucketName, String objectKey) {
+        BlobClient blobClient = blobServiceClient.getBlobContainerClient(bucketName).getBlobClient(objectKey);
+        return blobClient.getProperties().getBlobSize();
+    }
+
+    public void putObjectStream(InputStream inputStream, long contentLength, String bucketName, String objectKey) {
+        BlobClient blobClient = blobServiceClient.getBlobContainerClient(bucketName).getBlobClient(objectKey);
+        blobClient.upload(inputStream, contentLength);
+    }
+
+
+    public boolean checkBucketExist(String bucketName) {
+        BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(bucketName);
+        return blobContainerClient.exists();
+    }
+}

+ 74 - 0
src/main/java/io/milvus/bulkwriter/storage/client/MinioStorageClient.java

@@ -0,0 +1,74 @@
+package io.milvus.bulkwriter.storage.client;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import io.milvus.bulkwriter.storage.StorageClient;
+import io.minio.BucketExistsArgs;
+import io.minio.MinioClient;
+import io.minio.PutObjectArgs;
+import io.minio.StatObjectArgs;
+import io.minio.StatObjectResponse;
+import io.minio.credentials.StaticProvider;
+import okhttp3.OkHttpClient;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+import static com.amazonaws.services.s3.internal.Constants.MB;
+
+public class MinioStorageClient extends MinioClient implements StorageClient {
+    private static final Logger logger = LoggerFactory.getLogger(MinioStorageClient.class);
+
+    protected MinioStorageClient(MinioClient client, Multimap<String, String> extraHeader) {
+        super(client);
+    }
+
+    public static MinioStorageClient getStorageClient(String endpoint,
+                                                      String accessKey,
+                                                      String secretKey,
+                                                      String sessionToken,
+                                                      String region,
+                                                      OkHttpClient httpClient) {
+        Multimap<String, String> extraHeader = HashMultimap.create();
+        MinioClient.Builder minioClientBuilder = MinioClient.builder()
+                .endpoint(endpoint)
+                .credentialsProvider(new StaticProvider(accessKey, secretKey, sessionToken));
+
+        if (StringUtils.isNotEmpty(region)) {
+            minioClientBuilder.region(region);
+        }
+
+        if (httpClient != null) {
+            minioClientBuilder.httpClient(httpClient);
+        }
+
+        return new MinioStorageClient(minioClientBuilder.build(), extraHeader);
+    }
+
+    public Long getObjectEntity(String bucketName, String objectKey) throws Exception {
+        StatObjectArgs statObjectArgs = StatObjectArgs.builder()
+                .bucket(bucketName)
+                .object(objectKey)
+                .build();
+        StatObjectResponse statObject = statObject(statObjectArgs);
+        return statObject.size();
+    }
+
+    public void putObjectStream(InputStream inputStream, long contentLength, String bucketName, String objectKey) throws Exception {
+        PutObjectArgs putObjectArgs = PutObjectArgs.builder()
+                .bucket(bucketName)
+                .object(objectKey)
+                .stream(inputStream, contentLength, 5 * MB)
+                .build();
+        putObject(putObjectArgs);
+    }
+
+    public boolean checkBucketExist(String bucketName) throws Exception {
+        BucketExistsArgs bucketExistsArgs = BucketExistsArgs.builder()
+                .bucket(bucketName)
+                .build();
+        return bucketExists(bucketExistsArgs);
+    }
+}

+ 21 - 0
src/main/java/io/milvus/common/utils/ExceptionUtils.java

@@ -0,0 +1,21 @@
+package io.milvus.common.utils;
+
+import io.milvus.exception.UnExpectedException;
+import io.milvus.param.R;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExceptionUtils {
+    private static final Logger logger = LoggerFactory.getLogger(ExceptionUtils.class);
+
+    public static void throwUnExpectedException(String msg) {
+        logger.error(msg);
+        throw new UnExpectedException(msg);
+    }
+
+    public static void handleResponseStatus(R<?> r) {
+        if (r.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(r.getMessage());
+        }
+    }
+}

+ 31 - 0
src/main/java/io/milvus/exception/UnExpectedException.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.exception;
+
+import io.milvus.param.R;
+
+/**
+ * Exception for unexpected exception.
+ */
+public class UnExpectedException extends MilvusException {
+    public UnExpectedException(String msg) {
+        super(msg, R.Status.UnexpectedError.getCode());
+    }
+}

+ 1 - 1
src/main/java/io/milvus/param/ParamUtils.java

@@ -52,7 +52,7 @@ public class ParamUtils {
         checkFieldData(fieldSchema, values, false);
     }
 
-    private static void checkFieldData(FieldType fieldSchema, List<?> values, boolean verifyElementType) {
+    public static void checkFieldData(FieldType fieldSchema, List<?> values, boolean verifyElementType) {
         HashMap<DataType, String> errMsgs = getTypeErrorMsg();
         DataType dataType = verifyElementType ? fieldSchema.getElementType() : fieldSchema.getDataType();
 

+ 120 - 0
src/main/java/io/milvus/param/collection/CollectionSchemaParam.java

@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.param.collection;
+
+import io.milvus.exception.ParamException;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parameters for <code>createCollection</code> interface.
+ */
+@Getter
+@ToString
+public class CollectionSchemaParam {
+    private final List<FieldType> fieldTypes;
+    private final boolean enableDynamicField;
+
+    private CollectionSchemaParam(@NonNull Builder builder) {
+        this.fieldTypes = builder.fieldTypes;
+        this.enableDynamicField = builder.enableDynamicField;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link CollectionSchemaParam} class.
+     */
+    public static final class Builder {
+        private final List<FieldType> fieldTypes = new ArrayList<>();
+        private boolean enableDynamicField;
+        private Builder() {
+        }
+
+
+        /**
+         * Sets the collection if enableDynamicField.
+         *
+         * @param enableDynamicField enableDynamicField of the collection
+         * @return <code>Builder</code>
+         */
+        public Builder withEnableDynamicField(boolean enableDynamicField) {
+            this.enableDynamicField = enableDynamicField;
+            return this;
+        }
+
+        /**
+         * Sets the fieldTypes of the schema. The fieldTypes cannot be empty or null.
+         * @see FieldType
+         *
+         * @param fieldTypes a <code>List</code> of {@link FieldType}
+         * @return <code>Builder</code>
+         */
+        public Builder withFieldTypes(@NonNull List<FieldType> fieldTypes) {
+            this.fieldTypes.addAll(fieldTypes);
+            return this;
+        }
+
+        /**
+         * Adds a field.
+         * @see FieldType
+         *
+         * @param fieldType a {@link FieldType} object
+         * @return <code>Builder</code>
+         */
+        public Builder addFieldType(@NonNull FieldType fieldType) {
+            this.fieldTypes.add(fieldType);
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link CollectionSchemaParam} instance.
+         *
+         * @return {@link CollectionSchemaParam}
+         */
+        public CollectionSchemaParam build() throws ParamException {
+            if (fieldTypes.isEmpty()) {
+                throw new ParamException("Field numbers must be larger than 0");
+            }
+
+            boolean hasPartitionKey = false;
+            for (FieldType fieldType : fieldTypes) {
+                if (fieldType == null) {
+                    throw new ParamException("Collection field cannot be null");
+                }
+
+                if (fieldType.isPartitionKey()) {
+                    if (hasPartitionKey) {
+                        throw new ParamException("Only one partition key field is allowed in a collection");
+                    }
+                    hasPartitionKey = true;
+                }
+            }
+
+            return new CollectionSchemaParam(this);
+        }
+    }
+}

+ 30 - 2
src/main/java/io/milvus/param/collection/CreateCollectionParam.java

@@ -38,11 +38,11 @@ public class CreateCollectionParam {
     private final String collectionName;
     private final int shardsNum;
     private final String description;
-    private final List<FieldType> fieldTypes;
     private final int partitionsNum;
     private final ConsistencyLevelEnum consistencyLevel;
     private final String databaseName;
 
+    private final List<FieldType> fieldTypes;
     private final boolean enableDynamicField;
 
     private CreateCollectionParam(@NonNull Builder builder) {
@@ -67,10 +67,11 @@ public class CreateCollectionParam {
         private String collectionName;
         private int shardsNum = 0; // default to 0, let server decide the value
         private String description = "";
-        private final List<FieldType> fieldTypes = new ArrayList<>();
+        private List<FieldType> fieldTypes = new ArrayList<>();
         private int partitionsNum = 0;
         private ConsistencyLevelEnum consistencyLevel = ConsistencyLevelEnum.BOUNDED;
         private String databaseName;
+        private CollectionSchemaParam schema;
 
         private boolean enableDynamicField;
         private Builder() {
@@ -113,10 +114,12 @@ public class CreateCollectionParam {
 
         /**
          * Sets the collection if enableDynamicField.
+         * @deprecated Use {@link #withSchema(CollectionSchemaParam)} repace
          *
          * @param enableDynamicField enableDynamicField of the collection
          * @return <code>Builder</code>
          */
+        @Deprecated
         public Builder withEnableDynamicField(boolean enableDynamicField) {
             this.enableDynamicField = enableDynamicField;
             return this;
@@ -136,10 +139,12 @@ public class CreateCollectionParam {
         /**
          * Sets the schema of the collection. The schema cannot be empty or null.
          * @see FieldType
+         * @deprecated Use {@link #withSchema(CollectionSchemaParam)} repace
          *
          * @param fieldTypes a <code>List</code> of {@link FieldType}
          * @return <code>Builder</code>
          */
+        @Deprecated
         public Builder withFieldTypes(@NonNull List<FieldType> fieldTypes) {
             this.fieldTypes.addAll(fieldTypes);
             return this;
@@ -148,10 +153,12 @@ public class CreateCollectionParam {
         /**
          * Adds a field schema.
          * @see FieldType
+         * @deprecated Use {@link #withSchema(CollectionSchemaParam)} repace
          *
          * @param fieldType a {@link FieldType} object
          * @return <code>Builder</code>
          */
+        @Deprecated
         public Builder addFieldType(@NonNull FieldType fieldType) {
             this.fieldTypes.add(fieldType);
             return this;
@@ -183,6 +190,18 @@ public class CreateCollectionParam {
             return this;
         }
 
+        /**
+         * Sets the schema of collection.
+         *
+         *
+         * @param schema the schema of collection
+         * @return <code>Builder</code>
+         */
+        public Builder withSchema(@NonNull CollectionSchemaParam schema) {
+            this.schema = schema;
+            return this;
+        }
+
         /**
          * Verifies parameters and creates a new {@link CreateCollectionParam} instance.
          *
@@ -195,6 +214,15 @@ public class CreateCollectionParam {
                 throw new ParamException("ShardNum must be larger or equal to 0");
             }
 
+            if (!fieldTypes.isEmpty() && schema != null) {
+                throw new ParamException("Please use either withFieldTypes(), addFieldType(), or withSchema(), and do not use them simultaneously.");
+            }
+
+            if (schema != null) {
+                fieldTypes = schema.getFieldTypes();
+                enableDynamicField = schema.isEnableDynamicField();
+            }
+
             if (fieldTypes.isEmpty()) {
                 throw new ParamException("Field numbers must be larger than 0");
             }

+ 13 - 0
src/main/java/io/milvus/response/DescCollResponseWrapper.java

@@ -3,6 +3,7 @@ package io.milvus.response;
 import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
 import io.milvus.param.ParamUtils;
+import io.milvus.param.collection.CollectionSchemaParam;
 import io.milvus.param.collection.FieldType;
 import lombok.NonNull;
 
@@ -197,6 +198,18 @@ public class DescCollResponseWrapper {
         return pairs;
     }
 
+    /**
+     * Get the collection schema of collection
+     *
+     * @return {@link CollectionSchemaParam} schema of the collection
+     */
+    public CollectionSchemaParam getSchema() {
+        return CollectionSchemaParam.newBuilder()
+                .withFieldTypes(getFields())
+                .withEnableDynamicField(getEnableDynamicField())
+                .build();
+    }
+
     /**
      * Construct a <code>String</code> by {@link DescCollResponseWrapper} instance.
      *

Some files were not shown because too many files changed in this diff