浏览代码

optim bulkWriter to reduce memory usage (#1309)

Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
xushuang.hu 2 月之前
父节点
当前提交
8b28c2fb3d

+ 2 - 6
examples/src/main/java/io/milvus/v1/BulkWriterExample.java

@@ -232,7 +232,7 @@ public class BulkWriterExample {
                 .withCollectionSchema(collectionSchema)
                 .withLocalPath("/tmp/bulk_writer")
                 .withFileType(fileType)
-                .withChunkSize(128 * 1024 * 1024)
+                .withChunkSize(1024 * 1024 * 1024L)
                 .build();
 
         try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
@@ -250,7 +250,6 @@ public class BulkWriterExample {
             }
 
             System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
 
             localBulkWriter.commit(false);
             List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
@@ -279,7 +278,6 @@ public class BulkWriterExample {
             }
 
             System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
 
             remoteBulkWriter.commit(false);
             List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
@@ -318,7 +316,6 @@ public class BulkWriterExample {
             }
 
             System.out.println(localBulkWriter.getTotalRowCount() + " rows appends");
-            System.out.println(localBulkWriter.getBufferRowCount() + " rows in buffer not flushed");
             localBulkWriter.commit(false);
             System.out.printf("Append finished, %s rows%n", threadCount * rowsPerThread);
 
@@ -420,7 +417,6 @@ public class BulkWriterExample {
                 remoteBulkWriter.appendRow(rowObject);
             }
             System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
             System.out.println("Generate data files...");
             remoteBulkWriter.commit(false);
 
@@ -438,7 +434,7 @@ public class BulkWriterExample {
                 .withCollectionSchema(collectionSchema)
                 .withRemotePath("bulk_data")
                 .withFileType(fileType)
-                .withChunkSize(512 * 1024 * 1024)
+                .withChunkSize(1024 * 1024 * 1024L)
                 .withConnectParam(connectParam)
                 .build();
         return new RemoteBulkWriter(bulkWriterParam);

+ 0 - 4
examples/src/main/java/io/milvus/v2/BulkWriterExample.java

@@ -234,7 +234,6 @@ public class BulkWriterExample {
             }
 
             System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
 
             localBulkWriter.commit(false);
             List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
@@ -263,7 +262,6 @@ public class BulkWriterExample {
             }
 
             System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
 
             remoteBulkWriter.commit(false);
             List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
@@ -302,7 +300,6 @@ public class BulkWriterExample {
             }
 
             System.out.println(localBulkWriter.getTotalRowCount() + " rows appends");
-            System.out.println(localBulkWriter.getBufferRowCount() + " rows in buffer not flushed");
             localBulkWriter.commit(false);
             System.out.printf("Append finished, %s rows%n", threadCount * rowsPerThread);
 
@@ -404,7 +401,6 @@ public class BulkWriterExample {
                 remoteBulkWriter.appendRow(rowObject);
             }
             System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
             System.out.println("Generate data files...");
             remoteBulkWriter.commit(false);
 

+ 0 - 409
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/Buffer.java

@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package io.milvus.bulkwriter;
-
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import io.milvus.bulkwriter.common.clientenum.BulkFileType;
-import io.milvus.common.utils.ExceptionUtils;
-import io.milvus.bulkwriter.common.utils.ParquetUtils;
-import io.milvus.common.utils.JsonUtils;
-import io.milvus.grpc.DataType;
-import io.milvus.param.collection.CollectionSchemaParam;
-import io.milvus.param.collection.FieldType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.schema.MessageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
-
-public class Buffer {
-    private static final Logger logger = LoggerFactory.getLogger(Buffer.class);
-
-    private CollectionSchemaParam collectionSchema;
-    private BulkFileType fileType;
-    private Map<String, List<Object>> buffer;
-    private Map<String, FieldType> fields;
-
-
-    public Buffer(CollectionSchemaParam collectionSchema, BulkFileType fileType) {
-        this.collectionSchema = collectionSchema;
-        this.fileType = fileType;
-
-        buffer = new LinkedHashMap<>();
-        fields = new LinkedHashMap<>();
-
-        for (FieldType fieldType : collectionSchema.getFieldTypes()) {
-            if (fieldType.isPrimaryKey() && fieldType.isAutoID())
-                continue;
-            buffer.put(fieldType.getName(), Lists.newArrayList());
-            fields.put(fieldType.getName(), fieldType);
-        }
-
-        if (buffer.isEmpty()) {
-            ExceptionUtils.throwUnExpectedException("Illegal collection schema: fields list is empty");
-        }
-
-        if (collectionSchema.isEnableDynamicField()) {
-            buffer.put(DYNAMIC_FIELD_NAME, Lists.newArrayList());
-            fields.put(DYNAMIC_FIELD_NAME, FieldType.newBuilder().withName(DYNAMIC_FIELD_NAME).withDataType(DataType.JSON).build());
-        }
-    }
-
-    public Integer getRowCount() {
-        if (buffer.isEmpty()) {
-            return 0;
-        }
-
-        for (String fieldName : buffer.keySet()) {
-            return buffer.get(fieldName).size();
-        }
-        return null;
-    }
-
-    public void appendRow(Map<String, Object> row) {
-        for (String key : row.keySet()) {
-            if (key.equals(DYNAMIC_FIELD_NAME) && !this.collectionSchema.isEnableDynamicField()) {
-                continue; // skip dynamic field if it is disabled
-            }
-            buffer.get(key).add(row.get(key));
-        }
-    }
-
-    // verify row count of fields are equal
-    public List<String> persist(String localPath, Map<String, Object> config) throws IOException {
-        int rowCount = -1;
-        for (String key : buffer.keySet()) {
-            if (rowCount < 0) {
-                rowCount = buffer.get(key).size();
-            } else if (rowCount != buffer.get(key).size()) {
-                String msg = String.format("Column `%s` row count %s doesn't equal to the first column row count %s", key, buffer.get(key).size(), rowCount);
-                ExceptionUtils.throwUnExpectedException(msg);
-            }
-        }
-
-        // output files
-        if (fileType == BulkFileType.PARQUET) {
-            Integer bufferSize = (Integer) config.get("bufferSize");
-            Integer bufferRowCount = (Integer) config.get("bufferRowCount");
-            return persistParquet(localPath, bufferSize, bufferRowCount);
-        } else if (fileType == BulkFileType.JSON) {
-            return persistJSON(localPath);
-        } else if (fileType == BulkFileType.CSV) {
-            String separator = (String)config.getOrDefault("sep", "\t");
-            String nullKey = (String)config.getOrDefault("nullkey", "");
-            return persistCSV(localPath, separator, nullKey);
-        }
-        ExceptionUtils.throwUnExpectedException("Unsupported file type: " + fileType);
-        return null;
-    }
-
-    private List<String> persistParquet(String localPath, Integer bufferSize, Integer bufferRowCount) throws IOException {
-        String filePath = localPath + ".parquet";
-
-        // calculate a proper row group size
-        int rowGroupSizeMin = 1000;
-        int rowGroupSizeMax = 1000000;
-        int rowGroupSize = 10000;
-
-        // 32MB is an experience value that avoid high memory usage of parquet reader on server-side
-        int rowGroupBytes = 32 * 1024 * 1024;
-
-        int sizePerRow = (bufferSize / bufferRowCount) + 1;
-        rowGroupSize = rowGroupBytes / sizePerRow;
-        rowGroupSize = Math.max(rowGroupSizeMin, Math.min(rowGroupSizeMax, rowGroupSize));
-
-        // declare the messageType of the Parquet
-        MessageType messageType = ParquetUtils.parseCollectionSchema(collectionSchema);
-
-        // declare and define the ParquetWriter.
-        Path path = new Path(filePath);
-        Configuration configuration = new Configuration();
-        GroupWriteSupport.setSchema(messageType, configuration);
-        GroupWriteSupport writeSupport = new GroupWriteSupport();
-
-        try (ParquetWriter<Group> writer = new ParquetWriter<>(path,
-                ParquetFileWriter.Mode.CREATE,
-                writeSupport,
-                CompressionCodecName.UNCOMPRESSED,
-                rowGroupBytes,
-                5 * 1024 * 1024,
-                5 * 1024 * 1024,
-                ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
-                ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
-                ParquetWriter.DEFAULT_WRITER_VERSION,
-                configuration)) {
-
-            Map<String, FieldType> nameFieldType = collectionSchema.getFieldTypes().stream().collect(Collectors.toMap(FieldType::getName, e -> e));
-            if (collectionSchema.isEnableDynamicField()) {
-                nameFieldType.put(DYNAMIC_FIELD_NAME, FieldType.newBuilder()
-                        .withName(DYNAMIC_FIELD_NAME)
-                        .withDataType(DataType.JSON)
-                        .build());
-            }
-
-            List<String> fieldNameList = Lists.newArrayList(buffer.keySet());
-            int size = buffer.get(fieldNameList.get(0)).size();
-            for (int i = 0; i < size; ++i) {
-                // build Parquet data and encapsulate it into a group.
-                Group group = new SimpleGroupFactory(messageType).newGroup();
-                for (String fieldName : fieldNameList) {
-                    appendGroup(group, fieldName, buffer.get(fieldName).get(i), nameFieldType.get(fieldName));
-                }
-                writer.write(group);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-            throw e;
-        }
-
-        String msg = String.format("Successfully persist file %s, total size: %s, row count: %s, row group size: %s",
-                filePath, bufferSize, bufferRowCount, rowGroupSize);
-        logger.info(msg);
-        return Lists.newArrayList(filePath);
-    }
-
-    private List<String> persistJSON(String localPath) throws IOException {
-        String filePath = localPath + ".json";
-
-        Gson gson = new GsonBuilder().serializeNulls().create();
-        List<Map<String, Object>> data = new ArrayList<>();
-
-        List<String> fieldNameList = Lists.newArrayList(buffer.keySet());
-        int size = buffer.get(fieldNameList.get(0)).size();
-        for (int i = 0; i < size; ++i) {
-            Map<String, Object> row = new HashMap<>();
-            for (String fieldName : fieldNameList) {
-                if (buffer.get(fieldName).get(i) instanceof ByteBuffer) {
-                    row.put(fieldName, ((ByteBuffer)buffer.get(fieldName).get(i)).array());
-                } else {
-                    row.put(fieldName, buffer.get(fieldName).get(i));
-                }
-            }
-            data.add(row);
-        }
-
-        try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(filePath))) {
-            bufferedWriter.write("[\n");
-            for (int i = 0; i < data.size(); i++) {
-                String json = gson.toJson(data.get(i));
-                if (i != data.size()-1) {
-                    json += ",";
-                }
-                bufferedWriter.write(json);
-                bufferedWriter.newLine();
-            }
-            bufferedWriter.write("]\n");
-        } catch (IOException e) {
-            e.printStackTrace();
-            throw e;
-        }
-
-        return Lists.newArrayList(filePath);
-    }
-
-    private List<String> persistCSV(String localPath, String separator, String nullKey) throws IOException {
-        String filePath = localPath + ".csv";
-
-        Gson gson = new GsonBuilder().serializeNulls().create();
-        List<String> fieldNameList = Lists.newArrayList(buffer.keySet());
-        try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(filePath))) {
-            bufferedWriter.write(String.join(separator, fieldNameList));
-            bufferedWriter.newLine();
-            int size = buffer.get(fieldNameList.get(0)).size();
-            for (int i = 0; i < size; ++i) {
-                List<String> values = new ArrayList<>();
-                for (String fieldName : fieldNameList) {
-                    Object val = buffer.get(fieldName).get(i);
-                    String strVal = "";
-                    if (val == null) {
-                        strVal = nullKey;
-                    } else if (val instanceof ByteBuffer) {
-                        strVal = Arrays.toString(((ByteBuffer) val).array());
-                    } else if (val instanceof List || val instanceof Map) {
-                        strVal = gson.toJson(val); // server-side is using json to parse array field and vector field
-                    } else {
-                        strVal = val.toString();
-                    }
-
-                    // CSV format, all the single quotation should be replaced by double quotation
-                    if (strVal.startsWith("\"") && strVal.endsWith("\"")) {
-                        strVal = strVal.substring(1, strVal.length() - 1);
-                    }
-                    strVal = strVal.replace("\\\"", "\"");
-                    strVal = strVal.replace("\"", "\"\"");
-                    strVal = "\"" + strVal + "\"";
-                    values.add(strVal);
-                }
-
-                bufferedWriter.write(String.join(separator, values));
-                bufferedWriter.newLine();
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-            throw e;
-        }
-
-        return Lists.newArrayList(filePath);
-    }
-
-    private void appendGroup(Group group, String paramName, Object value, FieldType fieldType) {
-        DataType dataType = fieldType.getDataType();
-        switch (dataType) {
-            case Int8:
-            case Int16:
-                group.append(paramName, (Short)value);
-                break;
-            case Int32:
-                group.append(paramName, (Integer)value);
-                break;
-            case Int64:
-                group.append(paramName, (Long)value);
-                break;
-            case Float:
-                group.append(paramName, (Float)value);
-                break;
-            case Double:
-                group.append(paramName, (Double)value);
-                break;
-            case Bool:
-                group.append(paramName, (Boolean)value);
-                break;
-            case VarChar:
-            case String:
-            case JSON:
-                group.append(paramName, (String)value);
-                break;
-            case FloatVector:
-                addFloatArray(group, paramName, (List<Float>) value);
-                break;
-            case BinaryVector:
-            case Float16Vector:
-            case BFloat16Vector:
-                addBinaryVector(group, paramName, (ByteBuffer) value);
-                break;
-            case SparseFloatVector:
-                addSparseVector(group, paramName, (SortedMap<Long, Float>) value);
-                break;
-            case Array:
-                DataType elementType = fieldType.getElementType();
-                switch (elementType) {
-                    case Int8:
-                    case Int16:
-                    case Int32:
-                        addIntArray(group, paramName, (List<Integer>) value);
-                        break;
-                    case Int64:
-                        addLongArray(group, paramName, (List<Long>) value);
-                        break;
-                    case Float:
-                        addFloatArray(group, paramName, (List<Float>) value);
-                        break;
-                    case Double:
-                        addDoubleArray(group, paramName, (List<Double>) value);
-                        break;
-                    case String:
-                    case VarChar:
-                        addStringArray(group, paramName, (List<String>) value);
-                        break;
-                    case Bool:
-                        addBooleanArray(group, paramName, (List<Boolean>) value);
-                        break;
-                }
-        }
-    }
-
-    private static void addLongArray(Group group, String fieldName, List<Long> values) {
-        Group arrayGroup = group.addGroup(fieldName);
-        for (long value : values) {
-            Group addGroup = arrayGroup.addGroup(0);
-            addGroup.add(0, value);
-        }
-    }
-
-    private static void addStringArray(Group group, String fieldName, List<String> values) {
-        Group arrayGroup = group.addGroup(fieldName);
-        for (String value : values) {
-            Group addGroup = arrayGroup.addGroup(0);
-            addGroup.add(0, value);
-        }
-    }
-
-    private static void addIntArray(Group group, String fieldName, List<Integer> values) {
-        Group arrayGroup = group.addGroup(fieldName);
-        for (int value : values) {
-            Group addGroup = arrayGroup.addGroup(0);
-            addGroup.add(0, value);
-        }
-    }
-
-    private static void addFloatArray(Group group, String fieldName, List<Float> values) {
-        Group arrayGroup = group.addGroup(fieldName);
-        for (float value : values) {
-            Group addGroup = arrayGroup.addGroup(0);
-            addGroup.add(0, value);
-        }
-    }
-
-    private static void addDoubleArray(Group group, String fieldName, List<Double> values) {
-        Group arrayGroup = group.addGroup(fieldName);
-        for (double value : values) {
-            Group addGroup = arrayGroup.addGroup(0);
-            addGroup.add(0, value);
-        }
-    }
-
-    private static void addBooleanArray(Group group, String fieldName, List<Boolean> values) {
-        Group arrayGroup = group.addGroup(fieldName);
-        for (boolean value : values) {
-            Group addGroup = arrayGroup.addGroup(0);
-            addGroup.add(0, value);
-        }
-    }
-
-    private static void addBinaryVector(Group group, String fieldName, ByteBuffer byteBuffer) {
-        Group arrayGroup = group.addGroup(fieldName);
-        byte[] bytes = byteBuffer.array();
-        for (byte value : bytes) {
-            Group addGroup = arrayGroup.addGroup(0);
-            addGroup.add(0, value);
-        }
-    }
-
-    private static void addSparseVector(Group group, String fieldName, SortedMap<Long, Float> sparse) {
-        // sparse vector is parsed as JSON format string in the server side
-        String jsonString = JsonUtils.toJson(sparse);
-        group.append(fieldName, jsonString);
-    }
-}

+ 120 - 39
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/BulkWriter.java

@@ -19,11 +19,18 @@
 
 package io.milvus.bulkwriter;
 
-import com.google.gson.*;
+import com.google.common.collect.Lists;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
 import io.milvus.bulkwriter.common.clientenum.TypeSize;
+import io.milvus.bulkwriter.writer.CSVFileWriter;
+import io.milvus.bulkwriter.writer.FormatFileWriter;
+import io.milvus.bulkwriter.writer.JSONFileWriter;
+import io.milvus.bulkwriter.writer.ParquetFileWriter;
 import io.milvus.common.utils.ExceptionUtils;
-import io.milvus.grpc.*;
+import io.milvus.grpc.DataType;
 import io.milvus.param.ParamUtils;
 import io.milvus.param.collection.CollectionSchemaParam;
 import io.milvus.param.collection.FieldType;
@@ -34,7 +41,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
@@ -42,19 +56,29 @@ import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
 public abstract class BulkWriter {
     private static final Logger logger = LoggerFactory.getLogger(BulkWriter.class);
     protected CollectionSchemaParam collectionSchema;
-    protected int chunkSize;
+    protected long chunkSize;
+
     protected BulkFileType fileType;
+    protected String localPath;
+    protected String uuid;
+    protected int flushCount;
+    protected FormatFileWriter fileWriter;
+    protected final Map<String, Object> config;
+
+    protected long totalSize;
+    protected long totalRowCount;
+    protected ReentrantLock appendLock;
+    protected ReentrantLock fileWriteLock;
 
-    protected int bufferSize;
-    protected int bufferRowCount;
-    protected int totalRowCount;
-    protected Buffer buffer;
-    protected ReentrantLock bufferLock;
+    protected boolean firstWrite;
 
-    protected BulkWriter(CollectionSchemaParam collectionSchema, int chunkSize, BulkFileType fileType) {
+    protected BulkWriter(CollectionSchemaParam collectionSchema, long chunkSize, BulkFileType fileType, String localPath, Map<String, Object> config) throws IOException {
         this.collectionSchema = collectionSchema;
         this.chunkSize = chunkSize;
         this.fileType = fileType;
+        this.localPath = localPath;
+        this.uuid = UUID.randomUUID().toString();
+        this.config = config;
 
         if (CollectionUtils.isEmpty(collectionSchema.getFieldTypes())) {
             ExceptionUtils.throwUnExpectedException("collection schema fields list is empty");
@@ -63,57 +87,115 @@ public abstract class BulkWriter {
         if (!hasPrimaryField(collectionSchema.getFieldTypes())) {
             ExceptionUtils.throwUnExpectedException("primary field is null");
         }
-        bufferLock = new ReentrantLock();
-        buffer = null;
-        this.newBuffer();
-    }
+        appendLock = new ReentrantLock();
 
-    protected Integer getBufferSize() {
-        return bufferSize;
+        this.makeDir();
+        fileWriteLock = new ReentrantLock();
+        fileWriter = null;
+        this.newFileWriter();
+
+        firstWrite = true;
     }
 
-    public Integer getBufferRowCount() {
-        return bufferRowCount;
+    protected Long getTotalSize() {
+        return totalSize;
     }
 
-    public Integer getTotalRowCount() {
+    public Long getTotalRowCount() {
         return totalRowCount;
     }
 
-    protected Integer getChunkSize() {
+    protected Long getChunkSize() {
         return chunkSize;
     }
 
-    protected Buffer newBuffer() {
-        Buffer oldBuffer = buffer;
+    protected FormatFileWriter getFileWriter() {
+        return fileWriter;
+    }
+
+    protected FormatFileWriter newFileWriter() throws IOException {
+        FormatFileWriter oldFileWriter = fileWriter;
+
+        fileWriteLock.lock();
+        createWriterByType();
+        fileWriteLock.unlock();
+        return oldFileWriter;
+    }
+
+    private void createWriterByType() throws IOException {
+        flushCount += 1;
+        java.nio.file.Path path = Paths.get(localPath);
+        java.nio.file.Path filePathPrefix = path.resolve(String.valueOf(flushCount));
+
+        switch (fileType) {
+            case PARQUET:
+                this.fileWriter =  new ParquetFileWriter(collectionSchema, filePathPrefix.toString());
+                break;
+            case JSON:
+                this.fileWriter = new JSONFileWriter(collectionSchema, filePathPrefix.toString());
+                break;
+            case CSV:
+                this.fileWriter = new CSVFileWriter(collectionSchema, filePathPrefix.toString(), config);
+                break;
+            default:
+                ExceptionUtils.throwUnExpectedException("Unsupported file type: " + fileType);
+        }
+    }
+
+    private void makeDir() throws IOException {
+        java.nio.file.Path path = Paths.get(localPath);
+        createDirIfNotExist(path);
 
-        bufferLock.lock();
-        this.buffer = new Buffer(collectionSchema, fileType);
-        bufferLock.unlock();
+        java.nio.file.Path fullPath = path.resolve(uuid);
+        createDirIfNotExist(fullPath);
+        this.localPath = fullPath.toString();
+    }
 
-        return oldBuffer;
+    private void createDirIfNotExist(java.nio.file.Path path) throws IOException {
+        try {
+            Files.createDirectories(path);
+            logger.info("Data path created: {}", path);
+        } catch (IOException e) {
+            logger.error("Data Path create failed: {}", path);
+            throw e;
+        }
     }
 
     public void appendRow(JsonObject row) throws IOException, InterruptedException {
         Map<String, Object> rowValues = verifyRow(row);
+        List<String> filePaths = Lists.newArrayList();
+
+        appendLock.lock();
+        fileWriter.appendRow(rowValues, firstWrite);
+        firstWrite = false;
+        if (getTotalSize() > getChunkSize()) {
+            filePaths = commitIfFileReady(true);
+        }
+        appendLock.unlock();
 
-        bufferLock.lock();
-        buffer.appendRow(rowValues);
-        bufferLock.unlock();
+        if (CollectionUtils.isNotEmpty(filePaths)) {
+            callBackIfCommitReady(filePaths);
+        }
     }
 
-    protected void commit(boolean async) throws InterruptedException {
-        bufferLock.lock();
-        bufferSize = 0;
-        bufferRowCount = 0;
-        bufferLock.unlock();
+
+    protected abstract List<String> commitIfFileReady(boolean createNewFile) throws IOException;
+
+    protected abstract void callBackIfCommitReady(List<String> filePaths) throws IOException, InterruptedException;
+
+
+    protected void commit() {
+        appendLock.lock();
+        totalSize = 0;
+        totalRowCount = 0;
+        appendLock.unlock();
     }
 
     protected String getDataPath() {
         return "";
     }
 
-    private Map<String, Object> verifyRow(JsonObject row) {
+    protected Map<String, Object> verifyRow(JsonObject row) {
         int rowSize = 0;
         Map<String, Object> rowValues = new HashMap<>();
         for (FieldType fieldType : collectionSchema.getFieldTypes()) {
@@ -207,11 +289,10 @@ public abstract class BulkWriter {
             rowSize += strValues.length();
         }
 
-        bufferLock.lock();
-        bufferSize += rowSize;
-        bufferRowCount += 1;
+        appendLock.lock();
+        totalSize += rowSize;
         totalRowCount += 1;
-        bufferLock.unlock();
+        appendLock.unlock();
 
         return rowValues;
     }

+ 68 - 76
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/LocalBulkWriter.java

@@ -19,10 +19,12 @@
 
 package io.milvus.bulkwriter;
 
-import com.google.gson.JsonObject;
 import com.google.common.collect.Lists;
+import com.google.gson.JsonObject;
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
+import io.milvus.bulkwriter.writer.FormatFileWriter;
 import io.milvus.param.collection.CollectionSchemaParam;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,110 +35,115 @@ import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class LocalBulkWriter extends BulkWriter implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(LocalBulkWriter.class);
-    protected String localPath;
-    private String uuid;
-    private int flushCount;
+
     private Map<String, Thread> workingThread;
     private ReentrantLock workingThreadLock;
     private List<List<String>> localFiles;
-    private final Map<String, Object> config;
 
     public LocalBulkWriter(LocalBulkWriterParam bulkWriterParam) throws IOException {
-        super(bulkWriterParam.getCollectionSchema(), bulkWriterParam.getChunkSize(), bulkWriterParam.getFileType());
-        this.localPath = bulkWriterParam.getLocalPath();
-        this.uuid = UUID.randomUUID().toString();
+        super(bulkWriterParam.getCollectionSchema(), bulkWriterParam.getChunkSize(), bulkWriterParam.getFileType(), bulkWriterParam.getLocalPath(), bulkWriterParam.getConfig());
         this.workingThreadLock = new ReentrantLock();
         this.workingThread = new HashMap<>();
         this.localFiles = Lists.newArrayList();
-        this.config = bulkWriterParam.getConfig();
-        this.makeDir();
     }
 
     protected LocalBulkWriter(CollectionSchemaParam collectionSchema,
-                              int chunkSize,
+                              long chunkSize,
                               BulkFileType fileType,
                               String localPath,
                               Map<String, Object> config) throws IOException {
-        super(collectionSchema, chunkSize, fileType);
-        this.localPath = localPath;
-        this.uuid = UUID.randomUUID().toString();
+        super(collectionSchema, chunkSize, fileType, localPath, config);
         this.workingThreadLock = new ReentrantLock();
         this.workingThread = new HashMap<>();
         this.localFiles = Lists.newArrayList();
-        this.config = config;
-        this.makeDir();
     }
 
     @Override
     public void appendRow(JsonObject rowData) throws IOException, InterruptedException {
         super.appendRow(rowData);
+    }
 
+    @Override
+    protected void callBackIfCommitReady(List<String> filePaths) throws InterruptedException {
 //        only one thread can enter this section to persist data,
 //        in the _flush() method, the buffer will be swapped to a new one.
 //        in async mode, the flush thread is asynchronously, other threads can
 //        continue to append if the new buffer size is less than target size
         workingThreadLock.lock();
-        if (super.getBufferSize() > super.getChunkSize()) {
-            commit(true);
-        }
+        callBack(true, filePaths);
         workingThreadLock.unlock();
     }
 
     public void commit(boolean async) throws InterruptedException {
+        List<String> filePath = commitIfFileReady(false);
+        callBack(async, filePath);
+    }
+
+    protected List<String> commitIfFileReady(boolean createNewFile) {
+        if (super.getTotalRowCount() <= 0) {
+            String msg = "current_file_total_row_count less than 0, no need to generator a file";
+            logger.info(msg);
+            return null;
+        }
+
+        String filePath = super.getFileWriter().getFilePath();
+        String msg = String.format("Prepare to commit file:%s, current_file_total_row_count: %s, current_file_total_size:%s, create_new_file:%s",
+                filePath ,super.getTotalRowCount(), super.getTotalSize(), createNewFile);
+        logger.info(msg);
+
+        List<String> fileList = Lists.newArrayList(filePath);
+        try {
+            FormatFileWriter oldFileWriter = createNewFile ? this.newFileWriter() : super.getFileWriter();
+            oldFileWriter.close();
+
+            localFiles.add(fileList);
+            // reset the total size and count
+            super.commit();
+        } catch (IOException e) {
+            // this function is running in a thread
+            // TODO: interrupt main thread if failed to persist file
+            logger.error(e.getMessage());
+        }
+        return fileList;
+    }
+
+    private void callBack(boolean async, List<String> fileList) throws InterruptedException {
+        if (CollectionUtils.isEmpty(fileList)) {
+            return;
+        }
+
         // _async=True, the flush thread is asynchronously
         while (!workingThread.isEmpty()) {
-            String msg = String.format("Previous flush action is not finished, %s is waiting...", Thread.currentThread().getName());
+            String msg = String.format("Previous callBack action is not finished, %s is waiting...", Thread.currentThread().getName());
             logger.info(msg);
             TimeUnit.SECONDS.sleep(5);
         }
 
-        String msg = String.format("Prepare to flush buffer, row_count: %s, size: %s", super.getBufferRowCount(), super.getBufferSize());
+        String msg = String.format("Prepare to callBack, async:%s, fileList:%s", async, fileList);
         logger.info(msg);
 
-        int bufferRowCount = getBufferRowCount();
-        int bufferSize = getBufferSize();
-        Runnable runnable = () -> flush(bufferSize, bufferRowCount);
+        Runnable runnable = () -> commitIfFileReady(fileList);
         Thread thread = new Thread(runnable);
-        logger.info("Flush thread begin, name: {}", thread.getName());
+        logger.info("CallBack thread begin, name: {}", thread.getName());
         workingThread.put(thread.getName(), thread);
         thread.start();
 
         if (!async) {
-            logger.info("Wait flush to finish");
+            logger.info("Wait callBack to finish");
             thread.join();
         }
 
-        // reset the buffer size
-        super.commit(false);
-        logger.info("Commit done with async={}", async);
-    }
-
-    private void flush(Integer bufferSize, Integer bufferRowCount) {
-        flushCount += 1;
-        java.nio.file.Path path = Paths.get(localPath);
-        java.nio.file.Path flushDirPath = path.resolve(String.valueOf(flushCount));
-
-        Map<String, Object> config = new HashMap<>(this.config);
-        config.put("bufferSize", bufferSize);
-        config.put("bufferRowCount", bufferRowCount);
-        Buffer oldBuffer = super.newBuffer();
-        if (oldBuffer.getRowCount() > 0) {
-            try {
-                List<String> fileList = oldBuffer.persist(flushDirPath.toString(), config);
-                localFiles.add(fileList);
-                callBack(fileList);
-            } catch (IOException e) {
-                // this function is running in a thread
-                // TODO: interrupt main thread if failed to persist file
-                logger.error(e.getMessage());
-            }
+        logger.info("CallBack done with async={}", async);
+    }
+
+    private void commitIfFileReady(List<String> fileList) {
+        if (CollectionUtils.isNotEmpty(fileList)) {
+            callBack(fileList);
         }
         workingThread.remove(Thread.currentThread().getName());
         String msg = String.format("Flush thread done, name: %s", Thread.currentThread().getName());
@@ -155,40 +162,25 @@ public class LocalBulkWriter extends BulkWriter implements AutoCloseable {
         return localFiles;
     }
 
-    private void makeDir() throws IOException {
-        java.nio.file.Path path = Paths.get(localPath);
-        createDirIfNotExist(path);
-
-        java.nio.file.Path fullPath = path.resolve(uuid);
-        createDirIfNotExist(fullPath);
-        this.localPath = fullPath.toString();
-    }
-
-    private void createDirIfNotExist(java.nio.file.Path path) throws IOException {
-        try {
-            Files.createDirectories(path);
-            logger.info("Data path created: {}", path);
-        } catch (IOException e) {
-            logger.error("Data Path create failed: {}", path);
-            throw e;
-        }
-    }
-
     protected void exit() throws InterruptedException {
         // if still has data in memory, default commit
         workingThreadLock.lock();
-        if (getBufferSize() != null && getBufferSize() != 0) {
-            commit(true);
-        }
+
+        List<String> filePath = commitIfFileReady(false);
+        callBack(true, filePath);
         workingThreadLock.unlock();
 
         // wait flush thread
-        if (workingThread.size() > 0) {
+        if (!workingThread.isEmpty()) {
             for (String key : workingThread.keySet()) {
                 logger.info("Wait flush thread '{}' to finish", key);
-                workingThread.get(key).join();
+                Thread thread = workingThread.get(key);
+                if (thread != null) {
+                    thread.join();
+                }
             }
         }
+
         rmDir();
     }
 

+ 3 - 3
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/LocalBulkWriterParam.java

@@ -40,7 +40,7 @@ import java.util.Map;
 public class LocalBulkWriterParam {
     private final CollectionSchemaParam collectionSchema;
     private final String localPath;
-    private final int chunkSize;
+    private final long chunkSize;
     private final BulkFileType fileType;
     private final Map<String, Object> config;
 
@@ -62,7 +62,7 @@ public class LocalBulkWriterParam {
     public static final class Builder {
         private CollectionSchemaParam collectionSchema;
         private String localPath;
-        private int chunkSize = 128 * 1024 * 1024;
+        private long chunkSize = 128 * 1024 * 1024;
         private BulkFileType fileType = BulkFileType.PARQUET;
         private Map<String, Object> config = new HashMap<>();
 
@@ -102,7 +102,7 @@ public class LocalBulkWriterParam {
             return this;
         }
 
-        public Builder withChunkSize(int chunkSize) {
+        public Builder withChunkSize(long chunkSize) {
             this.chunkSize = chunkSize;
             return this;
         }

+ 1 - 3
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/RemoteBulkWriter.java

@@ -19,10 +19,10 @@
 
 package io.milvus.bulkwriter;
 
-import com.google.gson.JsonObject;
 import com.azure.storage.blob.models.BlobErrorCode;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.google.common.collect.Lists;
+import com.google.gson.JsonObject;
 import io.milvus.bulkwriter.connect.AzureConnectParam;
 import io.milvus.bulkwriter.connect.S3ConnectParam;
 import io.milvus.bulkwriter.connect.StorageConnectParam;
@@ -42,9 +42,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public class RemoteBulkWriter extends LocalBulkWriter {
     private static final Logger logger = LoggerFactory.getLogger(RemoteBulkWriter.class);

+ 3 - 3
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/RemoteBulkWriterParam.java

@@ -43,7 +43,7 @@ public class RemoteBulkWriterParam {
     private final CollectionSchemaParam collectionSchema;
     private final StorageConnectParam connectParam;
     private final String remotePath;
-    private final int chunkSize;
+    private final long chunkSize;
     private final BulkFileType fileType;
     private final Map<String, Object> config;
 
@@ -67,7 +67,7 @@ public class RemoteBulkWriterParam {
         private CollectionSchemaParam collectionSchema;
         private StorageConnectParam connectParam;
         private String remotePath;
-        private int chunkSize = 1024 * 1024 * 1024;
+        private long chunkSize = 128 * 1024 * 1024;
         private BulkFileType fileType = BulkFileType.PARQUET;
         private Map<String, Object> config = new HashMap<>();
 
@@ -112,7 +112,7 @@ public class RemoteBulkWriterParam {
             return this;
         }
 
-        public Builder withChunkSize(int chunkSize) {
+        public Builder withChunkSize(long chunkSize) {
             this.chunkSize = chunkSize;
             return this;
         }

+ 100 - 0
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/CSVFileWriter.java

@@ -0,0 +1,100 @@
+package io.milvus.bulkwriter.writer;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.milvus.param.collection.CollectionSchemaParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
+
+public class CSVFileWriter implements FormatFileWriter {
+    private static final Logger logger = LoggerFactory.getLogger(CSVFileWriter.class);
+
+    private BufferedWriter writer;
+    private CollectionSchemaParam collectionSchema;
+    private String filePath;
+    private Map<String, Object> config;
+
+    public CSVFileWriter(CollectionSchemaParam collectionSchema, String filePathPrefix, Map<String, Object> config) throws IOException {
+        this.collectionSchema = collectionSchema;
+        this.config = config;
+        initFilePath(filePathPrefix);
+        initWriter();
+    }
+
+    private void initFilePath(String filePathPrefix) {
+        this.filePath = filePathPrefix +  ".csv";
+    }
+
+    private void initWriter() throws IOException {
+        this.writer = new BufferedWriter(new java.io.FileWriter(filePath));
+    }
+
+    @Override
+    public void appendRow(Map<String, Object> rowValues, boolean firstWrite) throws IOException {
+        rowValues.keySet().removeIf(key -> key.equals(DYNAMIC_FIELD_NAME) && !this.collectionSchema.isEnableDynamicField());
+
+        Gson gson = new GsonBuilder().serializeNulls().create();
+        List<String> fieldNameList = Lists.newArrayList(rowValues.keySet());
+
+        try {
+            String separator = (String)config.getOrDefault("sep", "\t");
+            String nullKey = (String)config.getOrDefault("nullkey", "");
+
+            if (firstWrite) {
+                writer.write(String.join(separator, fieldNameList));
+                writer.newLine();
+            }
+
+            List<String> values = new ArrayList<>();
+            for (String fieldName : fieldNameList) {
+                Object val = rowValues.get(fieldName);
+                String strVal = "";
+                if (val == null) {
+                    strVal = nullKey;
+                } else if (val instanceof ByteBuffer) {
+                    strVal = Arrays.toString(((ByteBuffer) val).array());
+                } else if (val instanceof List || val instanceof Map) {
+                    strVal = gson.toJson(val); // server-side is using json to parse array field and vector field
+                } else {
+                    strVal = val.toString();
+                }
+
+                // CSV format, all the single quotation should be replaced by double quotation
+                if (strVal.startsWith("\"") && strVal.endsWith("\"")) {
+                    strVal = strVal.substring(1, strVal.length() - 1);
+                }
+                strVal = strVal.replace("\\\"", "\"");
+                strVal = strVal.replace("\"", "\"\"");
+                strVal = "\"" + strVal + "\"";
+                values.add(strVal);
+            }
+
+            writer.write(String.join(separator, values));
+            writer.newLine();
+        } catch (IOException e) {
+            logger.error("{} appendRow error when writing to file {}", this.getClass().getSimpleName(), filePath, e);
+            throw e;
+        }
+    }
+
+    @Override
+    public String getFilePath() {
+        return filePath;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.writer.close();
+    }
+}

+ 12 - 0
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/FormatFileWriter.java

@@ -0,0 +1,12 @@
+package io.milvus.bulkwriter.writer;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface FormatFileWriter {
+    void appendRow(Map<String, Object> rowValues, boolean firstWrite) throws IOException;
+
+    String getFilePath();
+
+    void close() throws IOException;
+}

+ 76 - 0
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/JSONFileWriter.java

@@ -0,0 +1,76 @@
+package io.milvus.bulkwriter.writer;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.milvus.param.collection.CollectionSchemaParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
+
+public class JSONFileWriter implements FormatFileWriter {
+    private static final Logger logger = LoggerFactory.getLogger(JSONFileWriter.class);
+
+    private BufferedWriter writer;
+    private CollectionSchemaParam collectionSchema;
+    private String filePath;
+
+    public JSONFileWriter(CollectionSchemaParam collectionSchema, String filePathPrefix) throws IOException {
+        this.collectionSchema = collectionSchema;
+        initFilePath(filePathPrefix);
+        initWriter();
+    }
+
+    private void initFilePath(String filePathPrefix) {
+        this.filePath = filePathPrefix +  ".json";
+    }
+
+    private void initWriter() throws IOException {
+        this.writer = new BufferedWriter(new java.io.FileWriter(filePath));
+    }
+
+    @Override
+    public void appendRow(Map<String, Object> rowValues, boolean firstWrite) throws IOException {
+        Gson gson = new GsonBuilder().serializeNulls().create();
+        rowValues.keySet().removeIf(key -> key.equals(DYNAMIC_FIELD_NAME) && !this.collectionSchema.isEnableDynamicField());
+        rowValues.replaceAll((key, value) -> value instanceof ByteBuffer ? ((ByteBuffer)value).array() : value);
+
+        try {
+            if (firstWrite) {
+                writer.write("[\n");
+            } else {
+                writer.write(",");
+                writer.newLine();
+            }
+            String json = gson.toJson(rowValues);
+            writer.write(json);
+        } catch (IOException e) {
+            logger.error("{} appendRow error when writing to file {}", this.getClass().getSimpleName(), filePath, e);
+            throw e;
+        }
+    }
+
+    @Override
+    public String getFilePath() {
+        return filePath;
+    }
+
+    @Override
+    public void close() throws IOException {
+        appendEnd();
+        this.writer.close();
+    }
+
+    /**
+     * For JSON format data, at the end, it is necessary to append "]" to complete the data.
+     */
+    private void appendEnd() throws IOException {
+        this.writer.newLine();
+        this.writer.write("]\n");
+    }
+}

+ 239 - 0
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/ParquetFileWriter.java

@@ -0,0 +1,239 @@
+package io.milvus.bulkwriter.writer;
+
+import io.milvus.bulkwriter.common.utils.ParquetUtils;
+import io.milvus.common.utils.JsonUtils;
+import io.milvus.grpc.DataType;
+import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.param.collection.FieldType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
+
+public class ParquetFileWriter implements FormatFileWriter {
+    private static final Logger logger = LoggerFactory.getLogger(ParquetFileWriter.class);
+
+    private ParquetWriter<Group> writer;
+    private CollectionSchemaParam collectionSchema;
+    private String filePath;
+    private MessageType messageType;
+    private Map<String, FieldType> nameFieldType;
+
+    public ParquetFileWriter(CollectionSchemaParam collectionSchema, String filePathPrefix) throws IOException {
+        this.collectionSchema = collectionSchema;
+        initFilePath(filePathPrefix);
+        initNameFieldType();
+        initMessageType();
+        initWriter();
+    }
+
+    private void initFilePath(String filePathPrefix) {
+        this.filePath = filePathPrefix +  ".parquet";
+    }
+
+    private void initMessageType() {
+        // declare the messageType of the Parquet
+        this.messageType = ParquetUtils.parseCollectionSchema(collectionSchema);
+    }
+
+    private void initWriter() throws IOException {
+        int rowGroupBytes = 16 * 1024 * 1024;
+
+        // declare and define the ParquetWriter.
+        Configuration configuration = new Configuration();
+        GroupWriteSupport.setSchema(messageType, configuration);
+        GroupWriteSupport writeSupport = new GroupWriteSupport();
+        this.writer = new ParquetWriter<>(new Path(filePath),
+                org.apache.parquet.hadoop.ParquetFileWriter.Mode.CREATE,
+                writeSupport,
+                CompressionCodecName.UNCOMPRESSED,
+                rowGroupBytes,
+                2 * 1024 * 1024,
+                2 * 1024 * 1024,
+                ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
+                ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
+                ParquetWriter.DEFAULT_WRITER_VERSION,
+                configuration);
+    }
+
+    private void initNameFieldType() {
+        Map<String, FieldType> nameFieldType = collectionSchema.getFieldTypes().stream().collect(Collectors.toMap(FieldType::getName, e -> e));
+        if (collectionSchema.isEnableDynamicField()) {
+            nameFieldType.put(DYNAMIC_FIELD_NAME, FieldType.newBuilder()
+                    .withName(DYNAMIC_FIELD_NAME)
+                    .withDataType(DataType.JSON)
+                    .build());
+        }
+        this.nameFieldType = nameFieldType;
+    }
+
+    @Override
+    public void appendRow(Map<String, Object> rowValues, boolean firstWrite) throws IOException {
+        rowValues.keySet().removeIf(key -> key.equals(DYNAMIC_FIELD_NAME) && !this.collectionSchema.isEnableDynamicField());
+
+        try {
+            Group group = new SimpleGroupFactory(messageType).newGroup();
+            for (String fieldName : rowValues.keySet()) {
+                appendGroup(group, fieldName, rowValues.get(fieldName), nameFieldType.get(fieldName));
+            }
+            writer.write(group);
+        } catch (IOException e) {
+            logger.error("{} appendRow error when writing to file {}", this.getClass().getSimpleName(), filePath, e);
+            throw e;
+        }
+    }
+
+    @Override
+    public String getFilePath() {
+        return filePath;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.writer.close();
+    }
+
+    private void appendGroup(Group group, String paramName, Object value, FieldType fieldType) {
+        DataType dataType = fieldType.getDataType();
+        switch (dataType) {
+            case Int8:
+            case Int16:
+                group.append(paramName, (Short)value);
+                break;
+            case Int32:
+                group.append(paramName, (Integer)value);
+                break;
+            case Int64:
+                group.append(paramName, (Long)value);
+                break;
+            case Float:
+                group.append(paramName, (Float)value);
+                break;
+            case Double:
+                group.append(paramName, (Double)value);
+                break;
+            case Bool:
+                group.append(paramName, (Boolean)value);
+                break;
+            case VarChar:
+            case String:
+            case JSON:
+                group.append(paramName, (String)value);
+                break;
+            case FloatVector:
+                addFloatArray(group, paramName, (List<Float>) value);
+                break;
+            case BinaryVector:
+            case Float16Vector:
+            case BFloat16Vector:
+                addBinaryVector(group, paramName, (ByteBuffer) value);
+                break;
+            case SparseFloatVector:
+                addSparseVector(group, paramName, (SortedMap<Long, Float>) value);
+                break;
+            case Array:
+                DataType elementType = fieldType.getElementType();
+                switch (elementType) {
+                    case Int8:
+                    case Int16:
+                    case Int32:
+                        addIntArray(group, paramName, (List<Integer>) value);
+                        break;
+                    case Int64:
+                        addLongArray(group, paramName, (List<Long>) value);
+                        break;
+                    case Float:
+                        addFloatArray(group, paramName, (List<Float>) value);
+                        break;
+                    case Double:
+                        addDoubleArray(group, paramName, (List<Double>) value);
+                        break;
+                    case String:
+                    case VarChar:
+                        addStringArray(group, paramName, (List<String>) value);
+                        break;
+                    case Bool:
+                        addBooleanArray(group, paramName, (List<Boolean>) value);
+                        break;
+                }
+        }
+    }
+
+    private static void addLongArray(Group group, String fieldName, List<Long> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (long value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addStringArray(Group group, String fieldName, List<String> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (String value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addIntArray(Group group, String fieldName, List<Integer> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (int value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addFloatArray(Group group, String fieldName, List<Float> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (float value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addDoubleArray(Group group, String fieldName, List<Double> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (double value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addBooleanArray(Group group, String fieldName, List<Boolean> values) {
+        Group arrayGroup = group.addGroup(fieldName);
+        for (boolean value : values) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addBinaryVector(Group group, String fieldName, ByteBuffer byteBuffer) {
+        Group arrayGroup = group.addGroup(fieldName);
+        byte[] bytes = byteBuffer.array();
+        for (byte value : bytes) {
+            Group addGroup = arrayGroup.addGroup(0);
+            addGroup.add(0, value);
+        }
+    }
+
+    private static void addSparseVector(Group group, String fieldName, SortedMap<Long, Float> sparse) {
+        // sparse vector is parsed as JSON format string in the server side
+        String jsonString = JsonUtils.toJson(sparse);
+        group.append(fieldName, jsonString);
+    }
+}

+ 0 - 3
sdk-bulkwriter/src/test/java/io/milvus/bulkwriter/BulkWriterTest.java

@@ -238,7 +238,6 @@ public class BulkWriterTest {
             buildData(localBulkWriter, 10, schemaV2.isEnableDynamicField());
 
             System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
             localBulkWriter.commit(false);
             List<List<String>> filePaths = localBulkWriter.getBatchFiles();
             System.out.println(filePaths);
@@ -262,7 +261,6 @@ public class BulkWriterTest {
             buildData(localBulkWriter, 10, schemaV2.isEnableDynamicField());
 
             System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
             localBulkWriter.commit(false);
             List<List<String>> filePaths = localBulkWriter.getBatchFiles();
             System.out.println(filePaths);
@@ -287,7 +285,6 @@ public class BulkWriterTest {
             buildData(localBulkWriter, 10, schemaV2.isEnableDynamicField());
 
             System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
-            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
             localBulkWriter.commit(false);
             List<List<String>> filePaths = localBulkWriter.getBatchFiles();
             System.out.println(filePaths);