浏览代码

Implement async methods for insert/search/query (#260)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 3 年之前
父节点
当前提交
efc4aa18e3

+ 159 - 228
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -19,13 +19,16 @@
 
 package io.milvus.client;
 
-import com.google.protobuf.ByteString;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.grpc.StatusRuntimeException;
 import io.milvus.exception.ClientNotConnectedException;
 import io.milvus.exception.IllegalResponseException;
 import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
-import io.milvus.param.Constant;
+import io.milvus.param.ParamUtils;
 import io.milvus.param.R;
 import io.milvus.param.RpcStatus;
 import io.milvus.param.alias.AlterAliasParam;
@@ -42,11 +45,10 @@ import org.apache.commons.collections4.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import javax.annotation.Nonnull;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.function.Function;
 
 public abstract class AbstractMilvusGrpcClient implements MilvusClient {
 
@@ -72,97 +74,6 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         return result;
     }
 
-    @SuppressWarnings("unchecked")
-    private FieldData genFieldData(String fieldName, DataType dataType, List<?> objects) {
-        if (objects == null) {
-            throw new ParamException("Cannot generate FieldData from null object");
-        }
-        FieldData.Builder builder = FieldData.newBuilder();
-        if (vectorDataType.contains(dataType)) {
-            if (dataType == DataType.FloatVector) {
-                List<Float> floats = new ArrayList<>();
-                // each object is List<Float>
-                for (Object object : objects) {
-                    if (object instanceof List) {
-                        List<Float> list = (List<Float>) object;
-                        floats.addAll(list);
-                    } else {
-                        throw new ParamException("The type of FloatVector must be List<Float>");
-                    }
-                }
-
-                int dim = floats.size() / objects.size();
-                FloatArray floatArray = FloatArray.newBuilder().addAllData(floats).build();
-                VectorField vectorField = VectorField.newBuilder().setDim(dim).setFloatVector(floatArray).build();
-                return builder.setFieldName(fieldName).setType(DataType.FloatVector).setVectors(vectorField).build();
-            } else if (dataType == DataType.BinaryVector) {
-                ByteBuffer totalBuf = null;
-                int dim = 0;
-                // each object is ByteBuffer
-                for (Object object : objects) {
-                    ByteBuffer buf = (ByteBuffer) object;
-                    if (totalBuf == null){
-                        totalBuf = ByteBuffer.allocate(buf.position() * objects.size());
-                        totalBuf.put(buf.array());
-                        dim = buf.position() * 8;
-                    } else {
-                        totalBuf.put(buf.array());
-                    }
-                }
-
-                assert totalBuf != null;
-                ByteString byteString = ByteString.copyFrom(totalBuf.array());
-                VectorField vectorField = VectorField.newBuilder().setDim(dim).setBinaryVector(byteString).build();
-                return builder.setFieldName(fieldName).setType(DataType.BinaryVector).setVectors(vectorField).build();
-            }
-        } else {
-            switch (dataType) {
-                case None:
-                case UNRECOGNIZED:
-                    throw new ParamException("Cannot support this dataType:" + dataType);
-                case Int64:
-                    List<Long> longs = objects.stream().map(p -> (Long) p).collect(Collectors.toList());
-                    LongArray longArray = LongArray.newBuilder().addAllData(longs).build();
-                    ScalarField scalarField1 = ScalarField.newBuilder().setLongData(longArray).build();
-                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField1).build();
-                case Int32:
-                case Int16:
-                case Int8:
-                    List<Integer> integers = objects.stream().map(p -> p instanceof Short ? ((Short)p).intValue() :(Integer) p).collect(Collectors.toList());
-                    IntArray intArray = IntArray.newBuilder().addAllData(integers).build();
-                    ScalarField scalarField2 = ScalarField.newBuilder().setIntData(intArray).build();
-                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField2).build();
-                case Bool:
-                    List<Boolean> booleans = objects.stream().map(p -> (Boolean) p).collect(Collectors.toList());
-                    BoolArray boolArray = BoolArray.newBuilder().addAllData(booleans).build();
-                    ScalarField scalarField3 = ScalarField.newBuilder().setBoolData(boolArray).build();
-                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField3).build();
-                case Float:
-                    List<Float> floats = objects.stream().map(p -> (Float) p).collect(Collectors.toList());
-                    FloatArray floatArray = FloatArray.newBuilder().addAllData(floats).build();
-                    ScalarField scalarField4 = ScalarField.newBuilder().setFloatData(floatArray).build();
-                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField4).build();
-                case Double:
-                    List<Double> doubles = objects.stream().map(p -> (Double) p).collect(Collectors.toList());
-                    DoubleArray doubleArray = DoubleArray.newBuilder().addAllData(doubles).build();
-                    ScalarField scalarField5 = ScalarField.newBuilder().setDoubleData(doubleArray).build();
-                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField5).build();
-                case String:
-                    List<String> strings = objects.stream().map(p -> (String) p).collect(Collectors.toList());
-                    StringArray stringArray = StringArray.newBuilder().addAllData(strings).build();
-                    ScalarField scalarField6 = ScalarField.newBuilder().setStringData(stringArray).build();
-                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField6).build();
-            }
-        }
-
-        return null;
-    }
-
-    private static final Set<DataType> vectorDataType = new HashSet<DataType>() {{
-        add(DataType.FloatVector);
-        add(DataType.BinaryVector);
-    }};
-
     private void waitForLoadingCollection(String collectionName, List<String> partitionNames,
                                           long waitingInterval, long timeout) throws IllegalResponseException {
         long tsBegin = System.currentTimeMillis();
@@ -178,7 +89,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             // If waiting time exceed timeout, exist the circle
             while (true) {
                 long tsNow = System.currentTimeMillis();
-                if ((tsNow - tsBegin) >= timeout*1000) {
+                if ((tsNow - tsBegin) >= timeout * 1000) {
                     logWarning("Waiting load thread is timeout, loading process may not be finished");
                     break;
                 }
@@ -222,9 +133,9 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             // If each partition's  inMemory percentage is 100, that means all the partitions have finished loading.
             // Otherwise, this thread will sleep a small interval and check again.
             // If waiting time exceed timeout, exist the circle
-            while(true) {
+            while (true) {
                 long tsNow = System.currentTimeMillis();
-                if ((tsNow - tsBegin) >= timeout*1000) {
+                if ((tsNow - tsBegin) >= timeout * 1000) {
                     logWarning("Waiting load thread is timeout, loading process may not be finished");
                     break;
                 }
@@ -292,7 +203,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         collectionSegIDs.forEach((collectionName, segmentIDs) -> {
             while (segmentIDs.getDataCount() > 0) {
                 long tsNow = System.currentTimeMillis();
-                if ((tsNow - tsBegin) >= timeout*1000) {
+                if ((tsNow - tsBegin) >= timeout * 1000) {
                     logWarning("Waiting flush thread is timeout, flush process may not be finished");
                     break;
                 }
@@ -301,7 +212,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         .addAllSegmentIDs(segmentIDs.getDataList())
                         .build();
                 GetFlushStateResponse response = blockingStub().getFlushState(getFlushStateRequest);
-                if(response.getFlushed()) {
+                if (response.getFlushed()) {
                     // if all segment of this collection has been flushed, break this circle and check next collection
                     String msg = segmentIDs.getDataCount() + " segments of " + collectionName + " has been flushed.";
                     logInfo(msg);
@@ -327,7 +238,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         long tsBegin = System.currentTimeMillis();
         while (true) {
             long tsNow = System.currentTimeMillis();
-            if ((tsNow - tsBegin) >= timeout*1000) {
+            if ((tsNow - tsBegin) >= timeout * 1000) {
                 String msg = "Waiting index thread is timeout, index process may not be finished";
                 logWarning(msg);
                 return R.failed(R.Status.Success, msg);
@@ -680,11 +591,11 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
     }
 
     /**
-     * Currently we do not support this method on client since compaction is not supported on server.
-     * Now it is only for internal use of getCollectionStatistics().
+     * Flush insert buffer into storage. To make sure the buffer persisted successfully, it calls
+     * GetFlushState() to check related segments state.
      */
-//    @Override
-    private R<FlushResponse> flush(@NonNull FlushParam requestParam) {
+    @Override
+    public R<FlushResponse> flush(@NonNull FlushParam requestParam) {
         if (!clientIsReady()) {
             return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
         }
@@ -1320,26 +1231,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logInfo(requestParam.toString());
 
         try {
-            String collectionName = requestParam.getCollectionName();
-            String partitionName = requestParam.getPartitionName();
-            List<InsertParam.Field> fields = requestParam.getFields();
-
-            //1. gen insert request
-            MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
-            InsertRequest.Builder insertBuilder = InsertRequest.newBuilder()
-                    .setCollectionName(collectionName)
-                    .setPartitionName(partitionName)
-                    .setBase(msgBase)
-                    .setNumRows(requestParam.getRowCount());
-
-            //2. gen fieldData
-            // TODO: check field type(use DescribeCollection get schema to compare)
-            for (InsertParam.Field field : fields) {
-                insertBuilder.addFieldsData(genFieldData(field.getName(), field.getType(), field.getValues()));
-            }
-
-            //3. call insert
-            InsertRequest insertRequest = insertBuilder.build();
+            InsertRequest insertRequest = ParamUtils.ConvertInsertParam(requestParam);
             MutationResult response = blockingStub().insert(insertRequest);
 
             if (response.getStatus().getErrorCode() == ErrorCode.Success) {
@@ -1364,108 +1256,62 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public R<SearchResults> search(@NonNull SearchParam requestParam) {
+    @SuppressWarnings("UnstableApiUsage")
+    public ListenableFuture<R<MutationResult>> insertAsync(InsertParam requestParam) {
         if (!clientIsReady()) {
-            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+            return Futures.immediateFuture(
+                    R.failed(new ClientNotConnectedException("Client rpc channel is not ready")));
         }
 
         logInfo(requestParam.toString());
 
-        try {
-            SearchRequest.Builder builder = SearchRequest.newBuilder()
-                    .setDbName("")
-                    .setCollectionName(requestParam.getCollectionName());
-            if (!requestParam.getPartitionNames().isEmpty()) {
-                requestParam.getPartitionNames().forEach(builder::addPartitionNames);
-            }
-
-            // prepare target vectors
-            // TODO: check target vector dimension(use DescribeCollection get schema to compare)
-            PlaceholderType plType = PlaceholderType.None;
-            List<?> vectors = requestParam.getVectors();
-            List<ByteString> byteStrings = new ArrayList<>();
-            for (Object vector : vectors) {
-                if (vector instanceof List) {
-                    plType = PlaceholderType.FloatVector;
-                    List<Float> list = (List<Float>) vector;
-                    ByteBuffer buf = ByteBuffer.allocate(Float.BYTES * list.size());
-                    buf.order(ByteOrder.LITTLE_ENDIAN);
-                    list.forEach(buf::putFloat);
-
-                    byte[] array = buf.array();
-                    ByteString bs = ByteString.copyFrom(array);
-                    byteStrings.add(bs);
-                } else if (vector instanceof ByteBuffer) {
-                    plType = PlaceholderType.BinaryVector;
-                    ByteBuffer buf = (ByteBuffer) vector;
-                    byte[] array = buf.array();
-                    ByteString bs = ByteString.copyFrom(array);
-                    byteStrings.add(bs);
-                } else {
-                    String msg = "Search target vector type is illegal(Only allow List<Float> or ByteBuffer)";
-                    logError(msg);
-                    return R.failed(R.Status.UnexpectedError, msg);
-                }
-            }
-
-            PlaceholderValue.Builder pldBuilder = PlaceholderValue.newBuilder()
-                    .setTag(Constant.VECTOR_TAG)
-                    .setType(plType);
-            byteStrings.forEach(pldBuilder::addValues);
-
-            PlaceholderValue plv = pldBuilder.build();
-            PlaceholderGroup placeholderGroup = PlaceholderGroup.newBuilder()
-                    .addPlaceholders(plv)
-                    .build();
+        InsertRequest insertRequest = ParamUtils.ConvertInsertParam(requestParam);
+        ListenableFuture<MutationResult> response = futureStub().insert(insertRequest);
+
+        Futures.addCallback(
+                response,
+                new FutureCallback<MutationResult>() {
+                    @Override
+                    public void onSuccess(MutationResult result) {
+                        if (result.getStatus().getErrorCode() == ErrorCode.Success) {
+                            logInfo("insertAsync successfully! Collection name:{}",
+                                    requestParam.getCollectionName());
+                        } else {
+                            logError("insertAsync failed! Collection name:{}\n{}",
+                                    requestParam.getCollectionName(), result.getStatus().getReason());
+                        }
+                    }
 
-            ByteString byteStr = placeholderGroup.toByteString();
-            builder.setPlaceholderGroup(byteStr);
+                    @Override
+                    public void onFailure(@Nonnull Throwable t) {
+                        logError("insertAsync failed:\n{}", t.getMessage());
+                    }
+                },
+                MoreExecutors.directExecutor());
 
-            // search parameters
-            builder.addSearchParams(
-                    KeyValuePair.newBuilder()
-                            .setKey(Constant.VECTOR_FIELD)
-                            .setValue(requestParam.getVectorFieldName())
-                            .build())
-                    .addSearchParams(
-                            KeyValuePair.newBuilder()
-                                    .setKey(Constant.TOP_K)
-                                    .setValue(String.valueOf(requestParam.getTopK()))
-                                    .build())
-                    .addSearchParams(
-                            KeyValuePair.newBuilder()
-                                    .setKey(Constant.METRIC_TYPE)
-                                    .setValue(requestParam.getMetricType())
-                                    .build())
-                    .addSearchParams(
-                            KeyValuePair.newBuilder()
-                                    .setKey(Constant.ROUND_DECIMAL)
-                                    .setValue(String.valueOf(requestParam.getRoundDecimal()))
-                                    .build());
-
-            if (null != requestParam.getParams() && !requestParam.getParams().isEmpty()) {
-                builder.addSearchParams(
-                        KeyValuePair.newBuilder()
-                                .setKey(Constant.PARAMS)
-                                .setValue(requestParam.getParams())
-                                .build());
-            }
+        Function<MutationResult, R<MutationResult>> transformFunc =
+                results -> {
+                    if (results.getStatus().getErrorCode() == ErrorCode.Success) {
+                        return R.success(results);
+                    } else {
+                        return R.failed(R.Status.valueOf(results.getStatus().getErrorCode().getNumber()),
+                                results.getStatus().getReason());
+                    }
+                };
 
-            if (!requestParam.getOutFields().isEmpty()) {
-                requestParam.getOutFields().forEach(builder::addOutputFields);
-            }
+        return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
+    }
 
-            // always use expression since dsl is discarded
-            builder.setDslType(DslType.BoolExprV1);
-            if (requestParam.getExpr() != null && !requestParam.getExpr().isEmpty()) {
-                builder.setDsl(requestParam.getExpr());
-            }
+    @Override
+    public R<SearchResults> search(@NonNull SearchParam requestParam) {
+        if (!clientIsReady()) {
+            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+        }
 
-            builder.setTravelTimestamp(requestParam.getTravelTimestamp());
-            builder.setGuaranteeTimestamp(requestParam.getGuaranteeTimestamp());
+        logInfo(requestParam.toString());
 
-            SearchRequest searchRequest = builder.build();
+        try {
+            SearchRequest searchRequest = ParamUtils.ConvertSearchParam(requestParam);
             SearchResults response = this.blockingStub().search(searchRequest);
 
             //TODO: truncate distance value by round decimal
@@ -1481,12 +1327,59 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         } catch (StatusRuntimeException e) {
             logError("SearchRequest RPC failed:{}", e.getMessage());
             return R.failed(e);
-        } catch (Exception e) {
+        } catch (ParamException e) {
             logError("SearchRequest failed:\n{}", e.getMessage());
             return R.failed(e);
         }
     }
 
+    @Override
+    @SuppressWarnings("UnstableApiUsage")
+    public ListenableFuture<R<SearchResults>> searchAsync(SearchParam requestParam) {
+        if (!clientIsReady()) {
+            return Futures.immediateFuture(
+                    R.failed(new ClientNotConnectedException("Client rpc channel is not ready")));
+        }
+
+        logInfo(requestParam.toString());
+
+        SearchRequest searchRequest = ParamUtils.ConvertSearchParam(requestParam);
+        ListenableFuture<SearchResults> response = this.futureStub().search(searchRequest);
+
+        Futures.addCallback(
+                response,
+                new FutureCallback<SearchResults>() {
+                    @Override
+                    public void onSuccess(SearchResults result) {
+                        if (result.getStatus().getErrorCode() == ErrorCode.Success) {
+                            logInfo("searchAsync successfully! Collection name:{}",
+                                    requestParam.getCollectionName());
+                        } else {
+                            logError("searchAsync failed! Collection name:{}\n{}",
+                                    requestParam.getCollectionName(), result.getStatus().getReason());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(@Nonnull Throwable t) {
+                        logError("searchAsync failed:\n{}", t.getMessage());
+                    }
+                },
+                MoreExecutors.directExecutor());
+
+        Function<SearchResults, R<SearchResults>> transformFunc =
+                results -> {
+                    if (results.getStatus().getErrorCode() == ErrorCode.Success) {
+                        return R.success(results);
+                    } else {
+                        return R.failed(R.Status.valueOf(results.getStatus().getErrorCode().getNumber()),
+                                results.getStatus().getReason());
+                    }
+                };
+
+        return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
+    }
+
     @Override
     public R<QueryResults> query(@NonNull QueryParam requestParam) {
         if (!clientIsReady()) {
@@ -1496,16 +1389,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logInfo(requestParam.toString());
 
         try {
-            QueryRequest queryRequest = QueryRequest.newBuilder()
-                    .setDbName("")
-                    .setCollectionName(requestParam.getCollectionName())
-                    .addAllPartitionNames(requestParam.getPartitionNames())
-                    .addAllOutputFields(requestParam.getOutFields())
-                    .setExpr(requestParam.getExpr())
-                    .setTravelTimestamp(requestParam.getTravelTimestamp())
-                    .setGuaranteeTimestamp(requestParam.getGuaranteeTimestamp())
-                    .build();
-
+            QueryRequest queryRequest = ParamUtils.ConvertQueryParam(requestParam);
             QueryResults response = this.blockingStub().query(queryRequest);
 
             if (response.getStatus().getErrorCode() == ErrorCode.Success) {
@@ -1526,6 +1410,53 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
     }
 
+    @Override
+    @SuppressWarnings("UnstableApiUsage")
+    public ListenableFuture<R<QueryResults>> queryAsync(QueryParam requestParam) {
+        if (!clientIsReady()) {
+            return Futures.immediateFuture(
+                    R.failed(new ClientNotConnectedException("Client rpc channel is not ready")));
+        }
+
+        logInfo(requestParam.toString());
+
+        QueryRequest queryRequest = ParamUtils.ConvertQueryParam(requestParam);
+        ListenableFuture<QueryResults> response = this.futureStub().query(queryRequest);
+
+        Futures.addCallback(
+                response,
+                new FutureCallback<QueryResults>() {
+                    @Override
+                    public void onSuccess(QueryResults result) {
+                        if (result.getStatus().getErrorCode() == ErrorCode.Success) {
+                            logInfo("queryAsync successfully! Collection name:{}",
+                                    requestParam.getCollectionName());
+                        } else {
+                            logError("queryAsync failed! Collection name:{}\n{}",
+                                    requestParam.getCollectionName(), result.getStatus().getReason());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(@Nonnull Throwable t) {
+                        logError("queryAsync failed:\n{}", t.getMessage());
+                    }
+                },
+                MoreExecutors.directExecutor());
+
+        Function<QueryResults, R<QueryResults>> transformFunc =
+                results -> {
+                    if (results.getStatus().getErrorCode() == ErrorCode.Success) {
+                        return R.success(results);
+                    } else {
+                        return R.failed(R.Status.valueOf(results.getStatus().getErrorCode().getNumber()),
+                                results.getStatus().getReason());
+                    }
+                };
+
+        return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
+    }
+
     @Override
     public R<CalcDistanceResults> calcDistance(@NonNull CalcDistanceParam requestParam) {
         if (!clientIsReady()) {

+ 36 - 9
src/main/java/io/milvus/client/MilvusClient.java

@@ -31,6 +31,7 @@ import io.milvus.param.dml.*;
 import io.milvus.param.index.*;
 import io.milvus.param.partition.*;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.TimeUnit;
 
 /** The Milvus Client Interface */
@@ -124,14 +125,14 @@ public interface MilvusClient {
      */
     R<ShowCollectionsResponse> showCollections(ShowCollectionsParam requestParam);
 
-//    /**
-//     * Flushes collections.
-//     * Currently we do not support this method on client since compaction is not supported on server.
-//     *
-//     * @param requestParam {@link FlushParam}
-//     * @return {status:result code,data: FlushResponse{flush segment ids}}
-//     */
-//    R<FlushResponse> flush(FlushParam requestParam);
+    /**
+     * Flushes collections.
+     * Currently we do not support this method on client since compaction is not supported on server.
+     *
+     * @param requestParam {@link FlushParam}
+     * @return {status:result code,data: FlushResponse{flush segment ids}}
+     */
+    R<FlushResponse> flush(FlushParam requestParam);
 
     /**
      * Creates a partition in the specified collection.
@@ -258,7 +259,7 @@ public interface MilvusClient {
     R<GetIndexBuildProgressResponse> getIndexBuildProgress(GetIndexBuildProgressParam requestParam);
 
     /**
-     * Inserts entities into a specified collection. Note that you don't need to 
+     * Inserts entities into a specified collection . Note that you don't need to
      * input primary key field if auto_id is enabled.
      *
      * @param requestParam {@link InsertParam}
@@ -266,6 +267,15 @@ public interface MilvusClient {
      */
     R<MutationResult> insert(InsertParam requestParam);
 
+    /**
+     * Inserts entities into a specified collection asynchronously. Note that you don't need to
+     * input primary key field if auto_id is enabled.
+     *
+     * @param requestParam {@link InsertParam}
+     * @return a <code>ListenableFuture</code> object which holds the object {status:result code, data: MutationResult{insert results}}
+     */
+    ListenableFuture<R<MutationResult>> insertAsync(InsertParam requestParam);
+
     /**
      * Deletes entity(s) based on primary key(s) filtered by boolean expression. Current release 
      * of Milvus only supports expression in the format "pk_field in [1, 2, ...]"
@@ -283,6 +293,14 @@ public interface MilvusClient {
      */
     R<SearchResults> search(SearchParam requestParam);
 
+    /**
+     * Conducts ANN search on a vector field asynchronously. Use expression to do filtering before search.
+     *
+     * @param requestParam {@link SearchParam}
+     * @return a <code>ListenableFuture</code> object which holds the object {status:result code, data: SearchResults{topK results}}
+     */
+    ListenableFuture<R<SearchResults>> searchAsync(SearchParam requestParam);
+
     /**
      * Queries entity(s) based on scalar field(s) filtered by boolean expression. 
      * Note that the order of the returned entities cannot be guaranteed.
@@ -292,6 +310,15 @@ public interface MilvusClient {
      */
     R<QueryResults> query(QueryParam requestParam);
 
+    /**
+     * Queries entity(s) asynchronously based on scalar field(s) filtered by boolean expression.
+     * Note that the order of the returned entities cannot be guaranteed.
+     *
+     * @param requestParam {@link QueryParam}
+     * @return {status:result code,data: QueryResults{filter results}}
+     */
+    ListenableFuture<R<QueryResults>> queryAsync(QueryParam requestParam);
+
     /**
      * Calculates the distance between the specified vectors.
      *

+ 1 - 1
src/main/java/io/milvus/client/MilvusServiceClient.java

@@ -68,7 +68,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
     }
 
     private static class TimeoutInterceptor implements ClientInterceptor {
-        private long timeoutMillis;
+        private final long timeoutMillis;
 
         TimeoutInterceptor(long timeoutMillis) {
             this.timeoutMillis = timeoutMillis;

+ 1 - 2
src/main/java/io/milvus/param/ConnectParam.java

@@ -20,9 +20,8 @@
 package io.milvus.param;
 
 import io.milvus.exception.ParamException;
-
-import io.milvus.param.partition.ShowPartitionsParam;
 import lombok.NonNull;
+
 import java.util.concurrent.TimeUnit;
 
 /**

+ 253 - 0
src/main/java/io/milvus/param/ParamUtils.java

@@ -1,8 +1,22 @@
 package io.milvus.param;
 
+import com.google.protobuf.ByteString;
 import io.milvus.exception.ParamException;
+import io.milvus.grpc.*;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.QueryParam;
+import io.milvus.param.dml.SearchParam;
+import lombok.NonNull;
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
  * Utility functions for param classes
  */
@@ -19,4 +33,243 @@ public class ParamUtils {
             throw new ParamException(name + " cannot be null or empty");
         }
     }
+
+    /**
+     * Convert {@link InsertParam} to proto type InsertRequest.
+     *
+     * @param requestParam {@link InsertParam} object
+     * @return a <code>InsertRequest</code> object
+     */
+    public static InsertRequest ConvertInsertParam(@NonNull InsertParam requestParam) {
+        String collectionName = requestParam.getCollectionName();
+        String partitionName = requestParam.getPartitionName();
+        List<InsertParam.Field> fields = requestParam.getFields();
+
+        // gen insert request
+        MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
+        InsertRequest.Builder insertBuilder = InsertRequest.newBuilder()
+                .setCollectionName(collectionName)
+                .setPartitionName(partitionName)
+                .setBase(msgBase)
+                .setNumRows(requestParam.getRowCount());
+
+        // gen fieldData
+        // TODO: check field type(use DescribeCollection get schema to compare)
+        for (InsertParam.Field field : fields) {
+            insertBuilder.addFieldsData(genFieldData(field.getName(), field.getType(), field.getValues()));
+        }
+
+        // gen request
+        return insertBuilder.build();
+    }
+
+    /**
+     * Convert {@link SearchParam} to proto type SearchRequest.
+     *
+     * @param requestParam {@link SearchParam} object
+     * @return a <code>SearchRequest</code> object
+     */
+    @SuppressWarnings("unchecked")
+    public static SearchRequest ConvertSearchParam(@NonNull SearchParam requestParam) throws ParamException {
+        SearchRequest.Builder builder = SearchRequest.newBuilder()
+                .setDbName("")
+                .setCollectionName(requestParam.getCollectionName());
+        if (!requestParam.getPartitionNames().isEmpty()) {
+            requestParam.getPartitionNames().forEach(builder::addPartitionNames);
+        }
+
+        // prepare target vectors
+        // TODO: check target vector dimension(use DescribeCollection get schema to compare)
+        PlaceholderType plType = PlaceholderType.None;
+        List<?> vectors = requestParam.getVectors();
+        List<ByteString> byteStrings = new ArrayList<>();
+        for (Object vector : vectors) {
+            if (vector instanceof List) {
+                plType = PlaceholderType.FloatVector;
+                List<Float> list = (List<Float>) vector;
+                ByteBuffer buf = ByteBuffer.allocate(Float.BYTES * list.size());
+                buf.order(ByteOrder.LITTLE_ENDIAN);
+                list.forEach(buf::putFloat);
+
+                byte[] array = buf.array();
+                ByteString bs = ByteString.copyFrom(array);
+                byteStrings.add(bs);
+            } else if (vector instanceof ByteBuffer) {
+                plType = PlaceholderType.BinaryVector;
+                ByteBuffer buf = (ByteBuffer) vector;
+                byte[] array = buf.array();
+                ByteString bs = ByteString.copyFrom(array);
+                byteStrings.add(bs);
+            } else {
+                String msg = "Search target vector type is illegal(Only allow List<Float> or ByteBuffer)";
+                throw new ParamException(msg);
+            }
+        }
+
+        PlaceholderValue.Builder pldBuilder = PlaceholderValue.newBuilder()
+                .setTag(Constant.VECTOR_TAG)
+                .setType(plType);
+        byteStrings.forEach(pldBuilder::addValues);
+
+        PlaceholderValue plv = pldBuilder.build();
+        PlaceholderGroup placeholderGroup = PlaceholderGroup.newBuilder()
+                .addPlaceholders(plv)
+                .build();
+
+        ByteString byteStr = placeholderGroup.toByteString();
+        builder.setPlaceholderGroup(byteStr);
+
+        // search parameters
+        builder.addSearchParams(
+                KeyValuePair.newBuilder()
+                        .setKey(Constant.VECTOR_FIELD)
+                        .setValue(requestParam.getVectorFieldName())
+                        .build())
+                .addSearchParams(
+                        KeyValuePair.newBuilder()
+                                .setKey(Constant.TOP_K)
+                                .setValue(String.valueOf(requestParam.getTopK()))
+                                .build())
+                .addSearchParams(
+                        KeyValuePair.newBuilder()
+                                .setKey(Constant.METRIC_TYPE)
+                                .setValue(requestParam.getMetricType())
+                                .build())
+                .addSearchParams(
+                        KeyValuePair.newBuilder()
+                                .setKey(Constant.ROUND_DECIMAL)
+                                .setValue(String.valueOf(requestParam.getRoundDecimal()))
+                                .build());
+
+        if (null != requestParam.getParams() && !requestParam.getParams().isEmpty()) {
+            builder.addSearchParams(
+                    KeyValuePair.newBuilder()
+                            .setKey(Constant.PARAMS)
+                            .setValue(requestParam.getParams())
+                            .build());
+        }
+
+        if (!requestParam.getOutFields().isEmpty()) {
+            requestParam.getOutFields().forEach(builder::addOutputFields);
+        }
+
+        // always use expression since dsl is discarded
+        builder.setDslType(DslType.BoolExprV1);
+        if (requestParam.getExpr() != null && !requestParam.getExpr().isEmpty()) {
+            builder.setDsl(requestParam.getExpr());
+        }
+
+        builder.setTravelTimestamp(requestParam.getTravelTimestamp());
+        builder.setGuaranteeTimestamp(requestParam.getGuaranteeTimestamp());
+
+        return builder.build();
+    }
+    /**
+     * Convert {@link QueryParam} to proto type QueryRequest.
+     *
+     * @param requestParam {@link QueryParam} object
+     * @return a <code>QueryRequest</code> object
+     */
+    public static QueryRequest ConvertQueryParam(@NonNull QueryParam requestParam) {
+        return QueryRequest.newBuilder()
+                .setCollectionName(requestParam.getCollectionName())
+                .addAllPartitionNames(requestParam.getPartitionNames())
+                .addAllOutputFields(requestParam.getOutFields())
+                .setExpr(requestParam.getExpr())
+                .setTravelTimestamp(requestParam.getTravelTimestamp())
+                .setGuaranteeTimestamp(requestParam.getGuaranteeTimestamp())
+                .build();
+    }
+
+
+    private static final Set<DataType> vectorDataType = new HashSet<DataType>() {{
+        add(DataType.FloatVector);
+        add(DataType.BinaryVector);
+    }};
+
+    @SuppressWarnings("unchecked")
+    private static FieldData genFieldData(String fieldName, DataType dataType, List<?> objects) {
+        if (objects == null) {
+            throw new ParamException("Cannot generate FieldData from null object");
+        }
+        FieldData.Builder builder = FieldData.newBuilder();
+        if (vectorDataType.contains(dataType)) {
+            if (dataType == DataType.FloatVector) {
+                List<Float> floats = new ArrayList<>();
+                // each object is List<Float>
+                for (Object object : objects) {
+                    if (object instanceof List) {
+                        List<Float> list = (List<Float>) object;
+                        floats.addAll(list);
+                    } else {
+                        throw new ParamException("The type of FloatVector must be List<Float>");
+                    }
+                }
+
+                int dim = floats.size() / objects.size();
+                FloatArray floatArray = FloatArray.newBuilder().addAllData(floats).build();
+                VectorField vectorField = VectorField.newBuilder().setDim(dim).setFloatVector(floatArray).build();
+                return builder.setFieldName(fieldName).setType(DataType.FloatVector).setVectors(vectorField).build();
+            } else if (dataType == DataType.BinaryVector) {
+                ByteBuffer totalBuf = null;
+                int dim = 0;
+                // each object is ByteBuffer
+                for (Object object : objects) {
+                    ByteBuffer buf = (ByteBuffer) object;
+                    if (totalBuf == null) {
+                        totalBuf = ByteBuffer.allocate(buf.position() * objects.size());
+                        totalBuf.put(buf.array());
+                        dim = buf.position() * 8;
+                    } else {
+                        totalBuf.put(buf.array());
+                    }
+                }
+
+                assert totalBuf != null;
+                ByteString byteString = ByteString.copyFrom(totalBuf.array());
+                VectorField vectorField = VectorField.newBuilder().setDim(dim).setBinaryVector(byteString).build();
+                return builder.setFieldName(fieldName).setType(DataType.BinaryVector).setVectors(vectorField).build();
+            }
+        } else {
+            switch (dataType) {
+                case None:
+                case UNRECOGNIZED:
+                    throw new ParamException("Cannot support this dataType:" + dataType);
+                case Int64:
+                    List<Long> longs = objects.stream().map(p -> (Long) p).collect(Collectors.toList());
+                    LongArray longArray = LongArray.newBuilder().addAllData(longs).build();
+                    ScalarField scalarField1 = ScalarField.newBuilder().setLongData(longArray).build();
+                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField1).build();
+                case Int32:
+                case Int16:
+                case Int8:
+                    List<Integer> integers = objects.stream().map(p -> p instanceof Short ? ((Short) p).intValue() : (Integer) p).collect(Collectors.toList());
+                    IntArray intArray = IntArray.newBuilder().addAllData(integers).build();
+                    ScalarField scalarField2 = ScalarField.newBuilder().setIntData(intArray).build();
+                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField2).build();
+                case Bool:
+                    List<Boolean> booleans = objects.stream().map(p -> (Boolean) p).collect(Collectors.toList());
+                    BoolArray boolArray = BoolArray.newBuilder().addAllData(booleans).build();
+                    ScalarField scalarField3 = ScalarField.newBuilder().setBoolData(boolArray).build();
+                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField3).build();
+                case Float:
+                    List<Float> floats = objects.stream().map(p -> (Float) p).collect(Collectors.toList());
+                    FloatArray floatArray = FloatArray.newBuilder().addAllData(floats).build();
+                    ScalarField scalarField4 = ScalarField.newBuilder().setFloatData(floatArray).build();
+                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField4).build();
+                case Double:
+                    List<Double> doubles = objects.stream().map(p -> (Double) p).collect(Collectors.toList());
+                    DoubleArray doubleArray = DoubleArray.newBuilder().addAllData(doubles).build();
+                    ScalarField scalarField5 = ScalarField.newBuilder().setDoubleData(doubleArray).build();
+                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField5).build();
+                case String:
+                    List<String> strings = objects.stream().map(p -> (String) p).collect(Collectors.toList());
+                    StringArray stringArray = StringArray.newBuilder().addAllData(strings).build();
+                    ScalarField scalarField6 = ScalarField.newBuilder().setStringData(stringArray).build();
+                    return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField6).build();
+            }
+        }
+
+        return null;
+    }
 }

+ 1 - 2
src/main/java/io/milvus/param/collection/CreateCollectionParam.java

@@ -21,10 +21,9 @@ package io.milvus.param.collection;
 
 import io.milvus.exception.ParamException;
 import io.milvus.param.ParamUtils;
-
-import io.milvus.param.alias.DropAliasParam;
 import lombok.Getter;
 import lombok.NonNull;
+
 import java.util.ArrayList;
 import java.util.List;
 

+ 0 - 1
src/main/java/io/milvus/param/control/GetCompactionPlansParam.java

@@ -1,7 +1,6 @@
 package io.milvus.param.control;
 
 import io.milvus.exception.ParamException;
-import io.milvus.param.collection.ShowCollectionsParam;
 import lombok.Getter;
 import lombok.NonNull;
 

+ 1 - 2
src/main/java/io/milvus/param/dml/CalcDistanceParam.java

@@ -21,10 +21,9 @@ package io.milvus.param.dml;
 
 import io.milvus.exception.ParamException;
 import io.milvus.param.MetricType;
-
-import io.milvus.param.control.ManualCompactionParam;
 import lombok.Getter;
 import lombok.NonNull;
+
 import java.util.List;
 
 /**

+ 1 - 2
src/main/java/io/milvus/param/index/CreateIndexParam.java

@@ -24,10 +24,9 @@ import io.milvus.param.Constant;
 import io.milvus.param.IndexType;
 import io.milvus.param.MetricType;
 import io.milvus.param.ParamUtils;
-
-import io.milvus.param.dml.SearchParam;
 import lombok.Getter;
 import lombok.NonNull;
+
 import java.util.HashMap;
 import java.util.Map;
 

+ 0 - 2
src/main/java/io/milvus/param/partition/CreatePartitionParam.java

@@ -21,8 +21,6 @@ package io.milvus.param.partition;
 
 import io.milvus.exception.ParamException;
 import io.milvus.param.ParamUtils;
-
-import io.milvus.param.index.GetIndexStateParam;
 import lombok.Getter;
 import lombok.NonNull;
 

+ 1 - 1
src/main/java/io/milvus/param/partition/ShowPartitionsParam.java

@@ -105,7 +105,7 @@ public class ShowPartitionsParam {
         public ShowPartitionsParam build() throws ParamException {
             ParamUtils.CheckNullEmptyString(collectionName, "Collection name");
 
-            if (partitionNames != null && !partitionNames.isEmpty()) {
+            if (!partitionNames.isEmpty()) {
                 for (String partitionName : partitionNames) {
                     ParamUtils.CheckNullEmptyString(partitionName, "Partition name");
                 }

+ 2 - 5
src/main/java/io/milvus/response/DescCollResponseWrapper.java

@@ -1,13 +1,10 @@
 package io.milvus.response;
 
+import io.milvus.grpc.CollectionSchema;
+import io.milvus.grpc.DescribeCollectionResponse;
 import io.milvus.grpc.FieldSchema;
 import io.milvus.grpc.KeyValuePair;
-import io.milvus.param.RpcStatus;
 import io.milvus.param.collection.FieldType;
-
-import io.milvus.grpc.CollectionSchema;
-import io.milvus.grpc.DescribeCollectionResponse;
-
 import lombok.NonNull;
 
 import java.util.ArrayList;

+ 176 - 11
src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -19,18 +19,18 @@
 
 package io.milvus.client;
 
-import io.milvus.response.*;
+import com.google.common.util.concurrent.ListenableFuture;
 import io.milvus.grpc.*;
 import io.milvus.param.*;
 import io.milvus.param.collection.*;
 import io.milvus.param.dml.InsertParam;
 import io.milvus.param.dml.QueryParam;
 import io.milvus.param.dml.SearchParam;
-
 import io.milvus.param.index.CreateIndexParam;
 import io.milvus.param.index.DescribeIndexParam;
 import io.milvus.param.partition.GetPartitionStatisticsParam;
 import io.milvus.param.partition.ShowPartitionsParam;
+import io.milvus.response.*;
 import org.apache.commons.text.RandomStringGenerator;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -40,7 +40,11 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -153,14 +157,29 @@ public class MilvusClientDockerTest {
         return vectors;
     }
 
+    private List<List<Float>> normalizeFloatVectors(List<List<Float>> src) {
+        for (List<Float> vector : src) {
+            double total = 0.0;
+            for (Float val : vector) {
+                total = total + val * val;
+            }
+            float squre = (float) Math.sqrt(total);
+            for (int i = 0; i < vector.size(); ++i) {
+                vector.set(i, vector.get(i) / squre);
+            }
+        }
+
+        return src;
+    }
+
     private List<ByteBuffer> generateBinaryVectors(int count) {
         Random ran = new Random();
         List<ByteBuffer> vectors = new ArrayList<>();
-        int byteCount = dimension/8;
+        int byteCount = dimension / 8;
         for (int n = 0; n < count; ++n) {
             ByteBuffer vector = ByteBuffer.allocate(byteCount);
             for (int i = 0; i < byteCount; ++i) {
-                vector.put((byte)ran.nextInt(Byte.MAX_VALUE));
+                vector.put((byte) ran.nextInt(Byte.MAX_VALUE));
             }
             vectors.add(vector);
         }
@@ -237,9 +256,9 @@ public class MilvusClientDockerTest {
         List<Short> ages = new ArrayList<>();
         for (long i = 0L; i < rowCount; ++i) {
             ids.add(i);
-            genders.add(i%3 == 0 ? Boolean.TRUE : Boolean.FALSE);
-            weights.add( ((double)(i + 1) / 100));
-            ages.add((short)((i + 1)%99));
+            genders.add(i % 3 == 0 ? Boolean.TRUE : Boolean.FALSE);
+            weights.add(((double) (i + 1) / 100));
+            ages.add((short) ((i + 1) % 99));
         }
         List<List<Float>> vectors = generateFloatVectors(rowCount);
 
@@ -257,7 +276,7 @@ public class MilvusClientDockerTest {
 
         R<MutationResult> insertR = client.withTimeout(10, TimeUnit.SECONDS).insert(insertParam);
         assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
-//        System.out.println(insertR.getData());
+        System.out.println(insertR.getData());
 
         // get collection statistics
         R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam
@@ -349,7 +368,7 @@ public class MilvusClientDockerTest {
                 .withOutFields(outputFields)
                 .build();
 
-        R<QueryResults> queryR= client.query(queryParam);
+        R<QueryResults> queryR = client.query(queryParam);
         assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
 
         // verify query result
@@ -388,7 +407,7 @@ public class MilvusClientDockerTest {
             List<?> out = queryResultsWrapper.getFieldWrapper(field4Name).getFieldData();
             assertEquals(nq, out.size());
             for (Object o : out) {
-                double d = (Double)o;
+                double d = (Double) o;
                 assertTrue(compareWeights.contains(d));
             }
         }
@@ -552,4 +571,150 @@ public class MilvusClientDockerTest {
         R<RpcStatus> dropR = client.dropCollection(dropParam);
         assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
     }
+
+    @Test
+    public void testAsyncMethods() {
+        String randomCollectionName = generator.generate(10);
+
+        // collection schema
+        String field1Name = "long_field";
+        String field2Name = "vec_field";
+        List<FieldType> fieldsSchema = new ArrayList<>();
+        fieldsSchema.add(FieldType.newBuilder()
+                .withPrimaryKey(true)
+                .withAutoID(true)
+                .withDataType(DataType.Int64)
+                .withName(field1Name)
+                .withDescription("identity")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.FloatVector)
+                .withName(field2Name)
+                .withDescription("face")
+                .withDimension(dimension)
+                .build());
+
+        // create collection
+        CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withDescription("test")
+                .withFieldTypes(fieldsSchema)
+                .build();
+
+        R<RpcStatus> createR = client.createCollection(createParam);
+        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        // insert async
+        List<ListenableFuture<R<MutationResult>>> futureResponses = new ArrayList<>();
+        int rowCount = 1000;
+        for (long i = 0L; i < 10; ++i) {
+            List<List<Float>> vectors = normalizeFloatVectors(generateFloatVectors(rowCount));
+            List<InsertParam.Field> fieldsInsert = new ArrayList<>();
+            fieldsInsert.add(new InsertParam.Field(field2Name, DataType.FloatVector, vectors));
+
+            InsertParam insertParam = InsertParam.newBuilder()
+                    .withCollectionName(randomCollectionName)
+                    .withFields(fieldsInsert)
+                    .build();
+
+            ListenableFuture<R<MutationResult>> insertFuture = client.insertAsync(insertParam);
+            futureResponses.add(insertFuture);
+        }
+
+        // get insert result
+        List<Long> queryIDs = new ArrayList<>();
+        for (ListenableFuture<R<MutationResult>> response : futureResponses) {
+            try {
+                R<MutationResult> insertR = response.get();
+                assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+                MutationResultWrapper wrapper = new MutationResultWrapper(insertR.getData());
+                queryIDs.add(wrapper.getLongIDs().get(0));
+            } catch (ExecutionException | InterruptedException e) {
+                System.out.println("failed to insert:" + e.getMessage());
+                return;
+            }
+        }
+
+        // get collection statistics
+        R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam
+                .newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFlush(true)
+                .build());
+        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+
+        GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
+        System.out.println("Collection row count: " + stat.getRowCount());
+
+        // load collection
+        R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+
+        // search async
+        List<List<Float>> targetVectors = normalizeFloatVectors(generateFloatVectors(2));
+        int topK = 5;
+        SearchParam searchParam = SearchParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withMetricType(MetricType.IP)
+                .withTopK(topK)
+                .withVectors(targetVectors)
+                .withVectorFieldName(field2Name)
+                .build();
+
+        ListenableFuture<R<SearchResults>> searchFuture = client.searchAsync(searchParam);
+
+        // query async
+        String expr = field1Name + " in " + queryIDs.toString();
+        List<String> outputFields = Arrays.asList(field1Name, field2Name);
+        QueryParam queryParam = QueryParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withExpr(expr)
+                .withOutFields(outputFields)
+                .build();
+
+        ListenableFuture<R<QueryResults>> queryFuture = client.queryAsync(queryParam);
+
+        try {
+            // get search results
+            R<SearchResults> searchR = searchFuture.get();
+            assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+
+            // verify search result
+            SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
+            System.out.println("Search results:");
+            for (int i = 0; i < targetVectors.size(); ++i) {
+                List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
+                assertEquals(topK, scores.size());
+                System.out.println(scores.toString());
+            }
+
+            // get query results
+            R<QueryResults> queryR = queryFuture.get();
+            assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+
+            // verify query result
+            QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
+            for (String fieldName : outputFields) {
+                FieldDataWrapper wrapper = queryResultsWrapper.getFieldWrapper(fieldName);
+                System.out.println("Query data of " + fieldName + ", row count: " + wrapper.getRowCount());
+                System.out.println(wrapper.getFieldData());
+                assertEquals(queryIDs.size(), wrapper.getFieldData().size());
+            }
+
+        } catch (InterruptedException | ExecutionException e) {
+            e.printStackTrace();
+        }
+
+        // drop collection
+        DropCollectionParam dropParam = DropCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build();
+
+        R<RpcStatus> dropR = client.dropCollection(dropParam);
+        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+    }
 }

+ 62 - 22
src/test/java/io/milvus/client/MilvusServiceClientTest.java

@@ -19,8 +19,8 @@
 
 package io.milvus.client;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
-import io.milvus.response.*;
 import io.milvus.exception.IllegalResponseException;
 import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
@@ -33,6 +33,7 @@ import io.milvus.param.control.*;
 import io.milvus.param.dml.*;
 import io.milvus.param.index.*;
 import io.milvus.param.partition.*;
+import io.milvus.response.*;
 import io.milvus.server.MockMilvusServer;
 import io.milvus.server.MockMilvusServerImpl;
 import org.junit.jupiter.api.Test;
@@ -41,6 +42,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.*;
@@ -95,6 +97,49 @@ class MilvusServiceClientTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private <T, P> void testAsyncFuncByName(String funcName, T param) {
+        try {
+            Class<?> clientClass = MilvusServiceClient.class;
+            Method testFunc = clientClass.getMethod(funcName, param.getClass());
+
+            // start mock server
+            MockMilvusServer server = startServer();
+            MilvusServiceClient client = startClient();
+
+            // test return ok with correct input
+            try {
+                ListenableFuture<R<P>> respFuture = (ListenableFuture<R<P>>) testFunc.invoke(client, param);
+                R<P> response = respFuture.get();
+                assertEquals(R.Status.Success.getCode(), response.getStatus());
+            } catch (InterruptedException | ExecutionException e) {
+                e.printStackTrace();
+            }
+
+            // stop mock server
+            server.stop();
+
+            // test return error without server
+            assertThrows(ExecutionException.class, () -> {
+                ListenableFuture<R<P>> respFuture = (ListenableFuture<R<P>>) testFunc.invoke(client, param);
+                R<P> response = respFuture.get();
+                assertNotEquals(R.Status.Success.getCode(), response.getStatus());
+            });
+
+            // test return error when client channel is shutdown
+            client.close();
+            try {
+                ListenableFuture<R<P>> respFuture = (ListenableFuture<R<P>>) testFunc.invoke(client, param);
+                R<P> response = respFuture.get();
+                assertEquals(R.Status.ClientNotConnected.getCode(), response.getStatus());
+            } catch (InterruptedException | ExecutionException e) {
+                e.printStackTrace();
+            }
+        } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+            System.out.println(e.toString());
+        }
+    }
+
     @Test
     void r() {
         String msg = "error";
@@ -647,6 +692,17 @@ class MilvusServiceClientTest {
         );
     }
 
+    @Test
+    void flush() {
+        FlushParam param = FlushParam.newBuilder()
+                .addCollectionName("collection1")
+                .withSyncFlush(Boolean.TRUE)
+                .withSyncFlushWaitingTimeout(1L)
+                .build();
+
+        testFuncByName("flush", param);
+    }
+
     @Test
     void createPartitionParam() {
         // test throw exception with illegal input
@@ -1432,6 +1488,7 @@ class MilvusServiceClientTest {
                 .build();
 
         testFuncByName("insert", param);
+        testAsyncFuncByName("insertAsync", param);
     }
 
     @Test
@@ -1632,14 +1689,9 @@ class MilvusServiceClientTest {
 
     @Test
     void search() {
-        // start mock server
-        MockMilvusServer server = startServer();
-        MilvusServiceClient client = startClient();
-
         List<String> partitions = Collections.singletonList("partition1");
         List<String> outputFields = Collections.singletonList("field2");
 
-        // test return ok with correct input
         List<List<Float>> vectors = new ArrayList<>();
         List<Float> vector1 = Arrays.asList(0.1f, 0.2f);
         vectors.add(vector1);
@@ -1658,8 +1710,7 @@ class MilvusServiceClientTest {
                 .withTravelTimestamp(1L)
                 .withGuaranteeTimestamp(1L)
                 .build();
-        R<SearchResults> resp = client.search(param);
-        assertEquals(R.Status.Success.getCode(), resp.getStatus());
+        testFuncByName("search", param);
 
         List<ByteBuffer> bVectors = new ArrayList<>();
         ByteBuffer buf = ByteBuffer.allocate(2);
@@ -1678,20 +1729,8 @@ class MilvusServiceClientTest {
                 .withExpr("dummy")
                 .build();
 
-        resp = client.search(param);
-        assertEquals(R.Status.Success.getCode(), resp.getStatus());
-
-        // stop mock server
-        server.stop();
-
-        // test return error without server
-        resp = client.search(param);
-        assertNotEquals(R.Status.Success.getCode(), resp.getStatus());
-
-        // test return error when client channel is shutdown
-        client.close();
-        resp = client.search(param);
-        assertEquals(R.Status.ClientNotConnected.getCode(), resp.getStatus());
+        testFuncByName("search", param);
+        testAsyncFuncByName("searchAsync", param);
     }
 
     @Test
@@ -1751,6 +1790,7 @@ class MilvusServiceClientTest {
                 .build();
 
         testFuncByName("query", param);
+        testAsyncFuncByName("queryAsync", param);
     }
 
     @Test