浏览代码

add createIndex/insert/flush/compact Async

Signed-off-by: youny626 <zzhu@fandm.edu>
youny626 5 年之前
父节点
当前提交
52fa2b7f7a

+ 87 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -135,6 +135,27 @@ public interface MilvusClient {
    */
   Response createIndex(Index index);
 
+  /**
+   * Creates index specified by <code>index</code> asynchronously
+   *
+   * @param index the <code>Index</code> object
+   *     <pre>
+   * example usage:
+   * <code>
+   * Index index = new Index.Builder(collectionName, IndexType.IVF_SQ8)
+   *                        .withParamsInJson("{\"nlist\": 16384}")
+   *                        .build();
+   * </code>
+   * </pre>
+   *
+   * @return a <code>ListenableFuture</code> object which holds the <code>Response</code>
+   * @see Index
+   * @see IndexType
+   * @see Response
+   * @see ListenableFuture
+   */
+  ListenableFuture<Response> createIndexAsync(Index index);
+
   /**
    * Creates a partition specified by <code>collectionName</code> and <code>tag</code>
    *
@@ -186,6 +207,29 @@ public interface MilvusClient {
    */
   InsertResponse insert(InsertParam insertParam);
 
+  /**
+   * Inserts data specified by <code>insertParam</code> asynchronously
+   *
+   * @param insertParam the <code>InsertParam</code> object
+   *     <pre>
+   * example usage:
+   * <code>
+   * InsertParam insertParam = new InsertParam.Builder(collectionName)
+   *                                          .withFloatVectors(floatVectors)
+   *                                          .withVectorIds(vectorIds)
+   *                                          .withPartitionTag(tag)
+   *                                          .build();
+   * </code>
+   * </pre>
+   *
+   * @return a <code>ListenableFuture</code> object which holds the <code>InsertResponse</code>
+   * @see InsertParam
+   * @see InsertResponse
+   * @see Response
+   * @see ListenableFuture
+   */
+  ListenableFuture<InsertResponse> insertAsync(InsertParam insertParam);
+
   /**
    * Searches vectors specified by <code>searchParam</code>
    *
@@ -336,6 +380,7 @@ public interface MilvusClient {
    * Drops collection index
    *
    * @param collectionName collection to drop index of
+   * @return <code>Response</code>
    * @see Response
    */
   Response dropIndex(String collectionName);
@@ -346,6 +391,7 @@ public interface MilvusClient {
    * segment can be uniquely identified by its partition tag or segment name respectively.
    *
    * @param collectionName collection to show info from
+   * @return <code>ShowCollectionInfoResponse</code>
    * @see ShowCollectionInfoResponse
    * @see CollectionInfo
    * @see CollectionInfo.PartitionInfo
@@ -359,6 +405,7 @@ public interface MilvusClient {
    *
    * @param collectionName collection to get vector from
    * @param id vector id
+   * @return <code>GetVectorByIdResponse</code>
    * @see GetVectorByIdResponse
    * @see Response
    */
@@ -369,6 +416,7 @@ public interface MilvusClient {
    *
    * @param collectionName collection to get vector ids from
    * @param segmentName segment name
+   * @return <code>GetVectorIdsResponse</code>
    * @see GetVectorIdsResponse
    * @see Response
    */
@@ -379,6 +427,7 @@ public interface MilvusClient {
    *
    * @param collectionName collection to delete ids from
    * @param ids a <code>List</code> of vector ids to delete
+   * @return <code>Response</code>
    * @see Response
    */
   Response deleteByIds(String collectionName, List<Long> ids);
@@ -388,6 +437,7 @@ public interface MilvusClient {
    *
    * @param collectionName collection to delete id from
    * @param id vector id to delete
+   * @return <code>Response</code>
    * @see Response
    */
   Response deleteById(String collectionName, Long id);
@@ -397,26 +447,63 @@ public interface MilvusClient {
    * after <code>flush</code> returned
    *
    * @param collectionNames a <code>List</code> of collections to flush
+   * @return <code>Response</code>
    * @see Response
    */
   Response flush(List<String> collectionNames);
 
+  /**
+   * Flushes data in a list collections asynchronously. Newly inserted or modifications on data will
+   * be visible after <code>flush</code> returned
+   *
+   * @param collectionNames a <code>List</code> of collections to flush
+   * @return a <code>ListenableFuture</code> object which holds the <code>Response</code>
+   * @see Response
+   * @see ListenableFuture
+   */
+  ListenableFuture<Response> flushAsync(List<String> collectionNames);
+
   /**
    * Flushes data in a collection. Newly inserted or modifications on data will be visible after
    * <code>flush</code> returned
    *
    * @param collectionName name of collection to flush
+   * @return <code>Response</code>
    * @see Response
    */
   Response flush(String collectionName);
 
+  /**
+   * Flushes data in a collection asynchronously. Newly inserted or modifications on data will be
+   * visible after <code>flush</code> returned
+   *
+   * @param collectionName name of collection to flush
+   * @return a <code>ListenableFuture</code> object which holds the <code>Response</code>
+   * @see Response
+   * @see ListenableFuture
+   */
+  ListenableFuture<Response> flushAsync(String collectionName);
+
   /**
    * Compacts the collection, erasing deleted data from disk and rebuild index in background (if the
    * data size after compaction is still larger than indexFileSize). Data was only soft-deleted
    * until you call compact.
    *
    * @param collectionName name of collection to compact
+   * @return <code>Response</code>
    * @see Response
    */
   Response compact(String collectionName);
+
+  /**
+   * Compacts the collection asynchronously, erasing deleted data from disk and rebuild index in
+   * background (if the data size after compaction is still larger than indexFileSize). Data was
+   * only soft-deleted until you call compact.
+   *
+   * @param collectionName name of collection to compact
+   * @return a <code>ListenableFuture</code> object which holds the <code>Response</code>
+   * @see Response
+   * @see ListenableFuture
+   */
+  ListenableFuture<Response> compactAsync(String collectionName);
 }

+ 324 - 140
src/main/java/io/milvus/client/MilvusGrpcClient.java

@@ -19,22 +19,25 @@
 
 package io.milvus.client;
 
-import com.google.common.util.concurrent.*;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.protobuf.ByteString;
 import io.grpc.ConnectivityState;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.StatusRuntimeException;
-import io.milvus.grpc.TopKQueryResult;
+import io.milvus.grpc.*;
 import org.apache.commons.collections4.ListUtils;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -48,8 +51,8 @@ public class MilvusGrpcClient implements MilvusClient {
   private static final String ANSI_BRIGHT_PURPLE = "\u001B[95m";
   private final String extraParamKey = "params";
   private ManagedChannel channel = null;
-  private io.milvus.grpc.MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub = null;
-  private io.milvus.grpc.MilvusServiceGrpc.MilvusServiceFutureStub futureStub = null;
+  private MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub = null;
+  private MilvusServiceGrpc.MilvusServiceFutureStub futureStub = null;
 
   ////////////////////// Constructor //////////////////////
   public MilvusGrpcClient() {
@@ -104,8 +107,8 @@ public class MilvusGrpcClient implements MilvusClient {
         timeout -= checkFrequency;
       }
 
-      blockingStub = io.milvus.grpc.MilvusServiceGrpc.newBlockingStub(channel);
-      futureStub = io.milvus.grpc.MilvusServiceGrpc.newFutureStub(channel);
+      blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
+      futureStub = MilvusServiceGrpc.newFutureStub(channel);
 
     } catch (Exception e) {
       if (!(e instanceof ConnectFailedException)) {
@@ -158,20 +161,20 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.TableSchema request =
-        io.milvus.grpc.TableSchema.newBuilder()
+    TableSchema request =
+        TableSchema.newBuilder()
             .setTableName(collectionMapping.getCollectionName())
             .setDimension(collectionMapping.getDimension())
             .setIndexFileSize(collectionMapping.getIndexFileSize())
             .setMetricType(collectionMapping.getMetricType().getVal())
             .build();
 
-    io.milvus.grpc.Status response;
+    Status response;
 
     try {
       response = blockingStub.createTable(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Created collection successfully!\n{0}", collectionMapping.toString());
         return new Response(Response.Status.SUCCESS);
       } else if (response.getReason().contentEquals("Collection already exists")) {
@@ -199,14 +202,13 @@ public class MilvusGrpcClient implements MilvusClient {
       return new HasCollectionResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), false);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.BoolReply response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    BoolReply response;
 
     try {
       response = blockingStub.hasTable(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("hasCollection `{0}` = {1}", collectionName, response.getBoolReply());
         return new HasCollectionResponse(
             new Response(Response.Status.SUCCESS), response.getBoolReply());
@@ -233,14 +235,13 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.Status response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    Status response;
 
     try {
       response = blockingStub.dropTable(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Dropped collection `{0}` successfully!", collectionName);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -262,24 +263,21 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.KeyValuePair extraParam =
-        io.milvus.grpc.KeyValuePair.newBuilder()
-            .setKey(extraParamKey)
-            .setValue(index.getParamsInJson())
-            .build();
-    io.milvus.grpc.IndexParam request =
-        io.milvus.grpc.IndexParam.newBuilder()
+    KeyValuePair extraParam =
+        KeyValuePair.newBuilder().setKey(extraParamKey).setValue(index.getParamsInJson()).build();
+    IndexParam request =
+        IndexParam.newBuilder()
             .setTableName(index.getCollectionName())
             .setIndexType(index.getIndexType().getVal())
             .addExtraParams(extraParam)
             .build();
 
-    io.milvus.grpc.Status response;
+    Status response;
 
     try {
       response = blockingStub.createIndex(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Created index successfully!\n{0}", index.toString());
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -293,6 +291,50 @@ public class MilvusGrpcClient implements MilvusClient {
     }
   }
 
+  @Override
+  public ListenableFuture<Response> createIndexAsync(@Nonnull Index index) {
+
+    if (!channelIsReadyOrIdle()) {
+      logWarning("You are not connected to Milvus server");
+      return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
+    }
+
+    KeyValuePair extraParam =
+        KeyValuePair.newBuilder().setKey(extraParamKey).setValue(index.getParamsInJson()).build();
+    IndexParam request =
+        IndexParam.newBuilder()
+            .setTableName(index.getCollectionName())
+            .setIndexType(index.getIndexType().getVal())
+            .addExtraParams(extraParam)
+            .build();
+
+    ListenableFuture<Status> response;
+
+    response = futureStub.createIndex(request);
+
+    Futures.addCallback(
+        response,
+        new FutureCallback<Status>() {
+          @Override
+          public void onSuccess(Status result) {
+            if (result.getErrorCode() == ErrorCode.SUCCESS) {
+              logInfo("Created index successfully!\n{0}", index.toString());
+            } else {
+              logSevere("CreateIndexAsync failed:\n{0}\n{1}", index.toString(), result.toString());
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            logSevere("CreateIndexAsync failed:\n{0}", t.getMessage());
+          }
+        },
+        MoreExecutors.directExecutor());
+
+    return Futures.transform(
+        response, transformStatusToResponseFunc::apply, MoreExecutors.directExecutor());
+  }
+
   @Override
   public Response createPartition(String collectionName, String tag) {
 
@@ -301,15 +343,15 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.PartitionParam request =
-        io.milvus.grpc.PartitionParam.newBuilder().setTableName(collectionName).setTag(tag).build();
+    PartitionParam request =
+        PartitionParam.newBuilder().setTableName(collectionName).setTag(tag).build();
 
-    io.milvus.grpc.Status response;
+    Status response;
 
     try {
       response = blockingStub.createPartition(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Created partition `{0}` in collection `{1}` successfully!", tag, collectionName);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -334,14 +376,13 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.PartitionList response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    PartitionList response;
 
     try {
       response = blockingStub.showPartitions(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo(
             "Current partitions of collection {0}: {1}",
             collectionName, response.getPartitionTagArrayList());
@@ -370,14 +411,14 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.PartitionParam request =
-        io.milvus.grpc.PartitionParam.newBuilder().setTableName(collectionName).setTag(tag).build();
-    io.milvus.grpc.Status response;
+    PartitionParam request =
+        PartitionParam.newBuilder().setTableName(collectionName).setTag(tag).build();
+    Status response;
 
     try {
       response = blockingStub.dropPartition(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Dropped partition `{1}` in collection `{1}` successfully!", tag, collectionName);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -402,7 +443,7 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
     }
 
-    List<io.milvus.grpc.RowRecord> rowRecordList =
+    List<RowRecord> rowRecordList =
         buildRowRecordList(insertParam.getFloatVectors(), insertParam.getBinaryVectors());
 
     io.milvus.grpc.InsertParam request =
@@ -412,12 +453,12 @@ public class MilvusGrpcClient implements MilvusClient {
             .addAllRowIdArray(insertParam.getVectorIds())
             .setPartitionTag(insertParam.getPartitionTag())
             .build();
-    io.milvus.grpc.VectorIds response;
+    VectorIds response;
 
     try {
       response = blockingStub.insert(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo(
             "Inserted {0} vectors to collection `{1}` successfully!",
             response.getVectorIdArrayCount(), insertParam.getCollectionName());
@@ -438,6 +479,69 @@ public class MilvusGrpcClient implements MilvusClient {
     }
   }
 
+  @Override
+  public ListenableFuture<InsertResponse> insertAsync(@Nonnull InsertParam insertParam) {
+
+    if (!channelIsReadyOrIdle()) {
+      logWarning("You are not connected to Milvus server");
+      return Futures.immediateFuture(
+          new InsertResponse(
+              new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>()));
+    }
+
+    List<RowRecord> rowRecordList =
+        buildRowRecordList(insertParam.getFloatVectors(), insertParam.getBinaryVectors());
+
+    io.milvus.grpc.InsertParam request =
+        io.milvus.grpc.InsertParam.newBuilder()
+            .setTableName(insertParam.getCollectionName())
+            .addAllRowRecordArray(rowRecordList)
+            .addAllRowIdArray(insertParam.getVectorIds())
+            .setPartitionTag(insertParam.getPartitionTag())
+            .build();
+
+    ListenableFuture<VectorIds> response;
+
+    response = futureStub.insert(request);
+
+    Futures.addCallback(
+        response,
+        new FutureCallback<VectorIds>() {
+          @Override
+          public void onSuccess(VectorIds result) {
+            if (result.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
+              logInfo(
+                  "Inserted {0} vectors to collection `{1}` successfully!",
+                  result.getVectorIdArrayCount(), insertParam.getCollectionName());
+            } else {
+              logSevere("InsertAsync failed:\n{0}", result.getStatus().toString());
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            logSevere("InsertAsync failed:\n{0}", t.getMessage());
+          }
+        },
+        MoreExecutors.directExecutor());
+
+    Function<VectorIds, InsertResponse> transformFunc =
+        vectorIds -> {
+          if (vectorIds.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
+            return new InsertResponse(
+                new Response(Response.Status.SUCCESS), vectorIds.getVectorIdArrayList());
+          } else {
+            return new InsertResponse(
+                new Response(
+                    Response.Status.valueOf(vectorIds.getStatus().getErrorCodeValue()),
+                    vectorIds.getStatus().getReason()),
+                new ArrayList<>());
+          }
+        };
+
+    return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
+  }
+
   @Override
   public SearchResponse search(@Nonnull SearchParam searchParam) {
 
@@ -448,11 +552,11 @@ public class MilvusGrpcClient implements MilvusClient {
       return searchResponse;
     }
 
-    List<io.milvus.grpc.RowRecord> rowRecordList =
+    List<RowRecord> rowRecordList =
         buildRowRecordList(searchParam.getFloatVectors(), searchParam.getBinaryVectors());
 
-    io.milvus.grpc.KeyValuePair extraParam =
-        io.milvus.grpc.KeyValuePair.newBuilder()
+    KeyValuePair extraParam =
+        KeyValuePair.newBuilder()
             .setKey(extraParamKey)
             .setValue(searchParam.getParamsInJson())
             .build();
@@ -466,12 +570,12 @@ public class MilvusGrpcClient implements MilvusClient {
             .addExtraParams(extraParam)
             .build();
 
-    io.milvus.grpc.TopKQueryResult response;
+    TopKQueryResult response;
 
     try {
       response = blockingStub.search(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         SearchResponse searchResponse = buildSearchResponse(response);
         searchResponse.setResponse(new Response(Response.Status.SUCCESS));
         logInfo(
@@ -505,11 +609,11 @@ public class MilvusGrpcClient implements MilvusClient {
       return Futures.immediateFuture(searchResponse);
     }
 
-    List<io.milvus.grpc.RowRecord> rowRecordList =
+    List<RowRecord> rowRecordList =
         buildRowRecordList(searchParam.getFloatVectors(), searchParam.getBinaryVectors());
 
-    io.milvus.grpc.KeyValuePair extraParam =
-        io.milvus.grpc.KeyValuePair.newBuilder()
+    KeyValuePair extraParam =
+        KeyValuePair.newBuilder()
             .setKey(extraParamKey)
             .setValue(searchParam.getParamsInJson())
             .build();
@@ -523,16 +627,16 @@ public class MilvusGrpcClient implements MilvusClient {
             .addExtraParams(extraParam)
             .build();
 
-    ListenableFuture<io.milvus.grpc.TopKQueryResult> response;
+    ListenableFuture<TopKQueryResult> response;
 
     response = futureStub.search(request);
 
     Futures.addCallback(
         response,
-        new FutureCallback<io.milvus.grpc.TopKQueryResult>() {
+        new FutureCallback<TopKQueryResult>() {
           @Override
-          public void onSuccess(io.milvus.grpc.TopKQueryResult result) {
-            if (result.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+          public void onSuccess(TopKQueryResult result) {
+            if (result.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
               logInfo(
                   "SearchAsync completed successfully! Returned results for {0} queries",
                   result.getRowNum());
@@ -548,27 +652,23 @@ public class MilvusGrpcClient implements MilvusClient {
         },
         MoreExecutors.directExecutor());
 
-    com.google.common.base.Function<io.milvus.grpc.TopKQueryResult, SearchResponse> transformFunc =
-        new com.google.common.base.Function<io.milvus.grpc.TopKQueryResult, SearchResponse>() {
-          @Override
-          public SearchResponse apply(io.milvus.grpc.TopKQueryResult topKQueryResult) {
-
-            if (topKQueryResult.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
-              SearchResponse searchResponse = buildSearchResponse(topKQueryResult);
-              searchResponse.setResponse(new Response(Response.Status.SUCCESS));
-              return searchResponse;
-            } else {
-              SearchResponse searchResponse = new SearchResponse();
-              searchResponse.setResponse(
-                  new Response(
-                      Response.Status.valueOf(topKQueryResult.getStatus().getErrorCodeValue()),
-                      topKQueryResult.getStatus().getReason()));
-              return searchResponse;
-            }
+    Function<TopKQueryResult, SearchResponse> transformFunc =
+        topKQueryResult -> {
+          if (topKQueryResult.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
+            SearchResponse searchResponse = buildSearchResponse(topKQueryResult);
+            searchResponse.setResponse(new Response(Response.Status.SUCCESS));
+            return searchResponse;
+          } else {
+            SearchResponse searchResponse = new SearchResponse();
+            searchResponse.setResponse(
+                new Response(
+                    Response.Status.valueOf(topKQueryResult.getStatus().getErrorCodeValue()),
+                    topKQueryResult.getStatus().getReason()));
+            return searchResponse;
           }
         };
 
-    return Futures.transform(response, transformFunc, MoreExecutors.directExecutor());
+    return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
   }
 
   @Override
@@ -582,11 +682,11 @@ public class MilvusGrpcClient implements MilvusClient {
       return searchResponse;
     }
 
-    List<io.milvus.grpc.RowRecord> rowRecordList =
+    List<RowRecord> rowRecordList =
         buildRowRecordList(searchParam.getFloatVectors(), searchParam.getBinaryVectors());
 
-    io.milvus.grpc.KeyValuePair extraParam =
-        io.milvus.grpc.KeyValuePair.newBuilder()
+    KeyValuePair extraParam =
+        KeyValuePair.newBuilder()
             .setKey(extraParamKey)
             .setValue(searchParam.getParamsInJson())
             .build();
@@ -600,18 +700,18 @@ public class MilvusGrpcClient implements MilvusClient {
             .addExtraParams(extraParam)
             .build();
 
-    io.milvus.grpc.SearchInFilesParam request =
-        io.milvus.grpc.SearchInFilesParam.newBuilder()
+    SearchInFilesParam request =
+        SearchInFilesParam.newBuilder()
             .addAllFileIdArray(fileIds)
             .setSearchParam(constructSearchParam)
             .build();
 
-    io.milvus.grpc.TopKQueryResult response;
+    TopKQueryResult response;
 
     try {
       response = blockingStub.searchInFiles(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         SearchResponse searchResponse = buildSearchResponse(response);
         searchResponse.setResponse(new Response(Response.Status.SUCCESS));
         logInfo(
@@ -645,14 +745,13 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), null);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.TableSchema response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    TableSchema response;
 
     try {
       response = blockingStub.describeTable(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         CollectionMapping collectionMapping =
             new CollectionMapping.Builder(response.getTableName(), response.getDimension())
                 .withIndexFileSize(response.getIndexFileSize())
@@ -687,13 +786,13 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
     }
 
-    io.milvus.grpc.Command request = io.milvus.grpc.Command.newBuilder().setCmd("").build();
-    io.milvus.grpc.TableNameList response;
+    Command request = Command.newBuilder().setCmd("").build();
+    TableNameList response;
 
     try {
       response = blockingStub.showTables(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         List<String> collectionNames = response.getTableNamesList();
         logInfo("Current collections: {0}", collectionNames.toString());
         return new ShowCollectionsResponse(new Response(Response.Status.SUCCESS), collectionNames);
@@ -721,14 +820,13 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), 0);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.TableRowCount response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    TableRowCount response;
 
     try {
       response = blockingStub.countTable(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         long collectionRowCount = response.getTableRowCount();
         logInfo("Collection `{0}` has {1} rows", collectionName, collectionRowCount);
         return new GetCollectionRowCountResponse(
@@ -767,13 +865,13 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.Command request = io.milvus.grpc.Command.newBuilder().setCmd(command).build();
-    io.milvus.grpc.StringReply response;
+    Command request = Command.newBuilder().setCmd(command).build();
+    StringReply response;
 
     try {
       response = blockingStub.cmd(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Command `{0}`: {1}", command, response.getStringReply());
         return new Response(Response.Status.SUCCESS, response.getStringReply());
       } else {
@@ -796,14 +894,13 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.Status response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    Status response;
 
     try {
       response = blockingStub.preloadTable(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Preloaded collection `{0}` successfully!", collectionName);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -825,16 +922,15 @@ public class MilvusGrpcClient implements MilvusClient {
       return new DescribeIndexResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), null);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.IndexParam response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    IndexParam response;
 
     try {
       response = blockingStub.describeIndex(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         String extraParam = "";
-        for (io.milvus.grpc.KeyValuePair kv : response.getExtraParamsList()) {
+        for (KeyValuePair kv : response.getExtraParamsList()) {
           if (kv.getKey().contentEquals(extraParamKey)) {
             extraParam = kv.getValue();
           }
@@ -870,14 +966,13 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.Status response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    Status response;
 
     try {
       response = blockingStub.dropIndex(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Dropped index for collection `{0}` successfully!", collectionName);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -900,22 +995,21 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), null);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.TableInfo response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    TableInfo response;
 
     try {
       response = blockingStub.showTableInfo(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
 
         List<CollectionInfo.PartitionInfo> partitionInfos = new ArrayList<>();
 
-        for (io.milvus.grpc.PartitionStat partitionStat : response.getPartitionsStatList()) {
+        for (PartitionStat partitionStat : response.getPartitionsStatList()) {
 
           List<CollectionInfo.PartitionInfo.SegmentInfo> segmentInfos = new ArrayList<>();
 
-          for (io.milvus.grpc.SegmentStat segmentStat : partitionStat.getSegmentsStatList()) {
+          for (SegmentStat segmentStat : partitionStat.getSegmentsStatList()) {
 
             CollectionInfo.PartitionInfo.SegmentInfo segmentInfo =
                 new CollectionInfo.PartitionInfo.SegmentInfo(
@@ -963,14 +1057,14 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>(), null);
     }
 
-    io.milvus.grpc.VectorIdentity request =
-        io.milvus.grpc.VectorIdentity.newBuilder().setTableName(collectionName).setId(id).build();
-    io.milvus.grpc.VectorData response;
+    VectorIdentity request =
+        VectorIdentity.newBuilder().setTableName(collectionName).setId(id).build();
+    VectorData response;
 
     try {
       response = blockingStub.getVectorByID(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
 
         logInfo(
             "getVectorById for id={0} in collection `{1}` returned successfully!",
@@ -1005,17 +1099,17 @@ public class MilvusGrpcClient implements MilvusClient {
           new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
     }
 
-    io.milvus.grpc.GetVectorIDsParam request =
-        io.milvus.grpc.GetVectorIDsParam.newBuilder()
+    GetVectorIDsParam request =
+        GetVectorIDsParam.newBuilder()
             .setTableName(collectionName)
             .setSegmentName(segmentName)
             .build();
-    io.milvus.grpc.VectorIds response;
+    VectorIds response;
 
     try {
       response = blockingStub.getVectorIDs(request);
 
-      if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
 
         logInfo(
             "getVectorIds in collection `{0}`, segment `{1}` returned successfully!",
@@ -1046,17 +1140,14 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.DeleteByIDParam request =
-        io.milvus.grpc.DeleteByIDParam.newBuilder()
-            .setTableName(collectionName)
-            .addAllIdArray(ids)
-            .build();
-    io.milvus.grpc.Status response;
+    DeleteByIDParam request =
+        DeleteByIDParam.newBuilder().setTableName(collectionName).addAllIdArray(ids).build();
+    Status response;
 
     try {
       response = blockingStub.deleteByID(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("deleteByIds in collection `{0}` completed successfully!", collectionName);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -1089,14 +1180,13 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.FlushParam request =
-        io.milvus.grpc.FlushParam.newBuilder().addAllTableNameArray(collectionNames).build();
-    io.milvus.grpc.Status response;
+    FlushParam request = FlushParam.newBuilder().addAllTableNameArray(collectionNames).build();
+    Status response;
 
     try {
       response = blockingStub.flush(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Flushed collection {0} successfully!", collectionNames);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -1110,6 +1200,43 @@ public class MilvusGrpcClient implements MilvusClient {
     }
   }
 
+  @Override
+  public ListenableFuture<Response> flushAsync(@Nonnull List<String> collectionNames) {
+
+    if (!channelIsReadyOrIdle()) {
+      logWarning("You are not connected to Milvus server");
+      return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
+    }
+
+    FlushParam request = FlushParam.newBuilder().addAllTableNameArray(collectionNames).build();
+
+    ListenableFuture<Status> response;
+
+    response = futureStub.flush(request);
+
+    Futures.addCallback(
+        response,
+        new FutureCallback<Status>() {
+          @Override
+          public void onSuccess(Status result) {
+            if (result.getErrorCode() == ErrorCode.SUCCESS) {
+              logInfo("Flushed collection {0} successfully!", collectionNames);
+            } else {
+              logSevere("Flush collection {0} failed:\n{1}", collectionNames, result.toString());
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            logSevere("FlushAsync failed:\n{0}", t.getMessage());
+          }
+        },
+        MoreExecutors.directExecutor());
+
+    return Futures.transform(
+        response, transformStatusToResponseFunc::apply, MoreExecutors.directExecutor());
+  }
+
   @Override
   public Response flush(String collectionName) {
     List<String> list =
@@ -1121,6 +1248,17 @@ public class MilvusGrpcClient implements MilvusClient {
     return flush(list);
   }
 
+  @Override
+  public ListenableFuture<Response> flushAsync(String collectionName) {
+    List<String> list =
+        new ArrayList<String>() {
+          {
+            add(collectionName);
+          }
+        };
+    return flushAsync(list);
+  }
+
   @Override
   public Response compact(String collectionName) {
     if (!channelIsReadyOrIdle()) {
@@ -1128,14 +1266,13 @@ public class MilvusGrpcClient implements MilvusClient {
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
 
-    io.milvus.grpc.TableName request =
-        io.milvus.grpc.TableName.newBuilder().setTableName(collectionName).build();
-    io.milvus.grpc.Status response;
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+    Status response;
 
     try {
       response = blockingStub.compact(request);
 
-      if (response.getErrorCode() == io.milvus.grpc.ErrorCode.SUCCESS) {
+      if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Compacted collection `{0}` successfully!", collectionName);
         return new Response(Response.Status.SUCCESS);
       } else {
@@ -1149,16 +1286,63 @@ public class MilvusGrpcClient implements MilvusClient {
     }
   }
 
+  @Override
+  public ListenableFuture<Response> compactAsync(@Nonnull String collectionName) {
+
+    if (!channelIsReadyOrIdle()) {
+      logWarning("You are not connected to Milvus server");
+      return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
+    }
+
+    TableName request = TableName.newBuilder().setTableName(collectionName).build();
+
+    ListenableFuture<Status> response;
+
+    response = futureStub.compact(request);
+
+    Futures.addCallback(
+        response,
+        new FutureCallback<Status>() {
+          @Override
+          public void onSuccess(Status result) {
+            if (result.getErrorCode() == ErrorCode.SUCCESS) {
+              logInfo("Compacted collection `{0}` successfully!", collectionName);
+            } else {
+              logSevere("Compact collection `{0}` failed:\n{1}", collectionName, result.toString());
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            logSevere("CompactAsync failed:\n{0}", t.getMessage());
+          }
+        },
+        MoreExecutors.directExecutor());
+
+    return Futures.transform(
+        response, transformStatusToResponseFunc::apply, MoreExecutors.directExecutor());
+  }
+
   ///////////////////// Util Functions/////////////////////
-  private List<io.milvus.grpc.RowRecord> buildRowRecordList(
+  Function<Status, Response> transformStatusToResponseFunc =
+      status -> {
+        if (status.getErrorCode() == ErrorCode.SUCCESS) {
+          return new Response(Response.Status.SUCCESS);
+        } else {
+          return new Response(
+              Response.Status.valueOf(status.getErrorCodeValue()), status.getReason());
+        }
+      };
+
+  private List<RowRecord> buildRowRecordList(
       @Nonnull List<List<Float>> floatVectors, @Nonnull List<ByteBuffer> binaryVectors) {
-    List<io.milvus.grpc.RowRecord> rowRecordList = new ArrayList<>();
+    List<RowRecord> rowRecordList = new ArrayList<>();
 
     int largerSize = Math.max(floatVectors.size(), binaryVectors.size());
 
     for (int i = 0; i < largerSize; ++i) {
 
-      io.milvus.grpc.RowRecord.Builder rowRecordBuilder = io.milvus.grpc.RowRecord.newBuilder();
+      RowRecord.Builder rowRecordBuilder = RowRecord.newBuilder();
 
       if (i < floatVectors.size()) {
         rowRecordBuilder.addAllFloatData(floatVectors.get(i));
@@ -1174,7 +1358,7 @@ public class MilvusGrpcClient implements MilvusClient {
     return rowRecordList;
   }
 
-  private SearchResponse buildSearchResponse(io.milvus.grpc.TopKQueryResult topKQueryResult) {
+  private SearchResponse buildSearchResponse(TopKQueryResult topKQueryResult) {
 
     final int numQueries = (int) topKQueryResult.getRowNum();
     final int topK =

+ 77 - 0
src/test/java/io/milvus/client/MilvusGrpcClientTest.java

@@ -289,6 +289,21 @@ class MilvusClientTest {
     assertTrue(createIndexResponse.ok());
   }
 
+  @org.junit.jupiter.api.Test
+  void createIndexAsync() throws ExecutionException, InterruptedException {
+    insert();
+    assertTrue(client.flush(randomCollectionName).ok());
+
+    Index index =
+        new Index.Builder(randomCollectionName, IndexType.IVF_SQ8)
+            .withParamsInJson("{\"nlist\": 19384}")
+            .build();
+
+    ListenableFuture<Response> createIndexResponseFuture = client.createIndexAsync(index);
+    Response createIndexResponse = createIndexResponseFuture.get();
+    assertTrue(createIndexResponse.ok());
+  }
+
   @org.junit.jupiter.api.Test
   void insert() {
     List<List<Float>> vectors = generateFloatVectors(size, dimension);
@@ -299,6 +314,17 @@ class MilvusClientTest {
     assertEquals(size, insertResponse.getVectorIds().size());
   }
 
+  @org.junit.jupiter.api.Test
+  void insertAsync() throws ExecutionException, InterruptedException {
+    List<List<Float>> vectors = generateFloatVectors(size, dimension);
+    InsertParam insertParam =
+        new InsertParam.Builder(randomCollectionName).withFloatVectors(vectors).build();
+    ListenableFuture<InsertResponse> insertResponseFuture = client.insertAsync(insertParam);
+    InsertResponse insertResponse = insertResponseFuture.get();
+    assertTrue(insertResponse.ok());
+    assertEquals(size, insertResponse.getVectorIds().size());
+  }
+
   @org.junit.jupiter.api.Test
   void insertBinary() {
     final long binaryDimension = 10000;
@@ -641,6 +667,11 @@ class MilvusClientTest {
     assertTrue(client.flush(randomCollectionName).ok());
   }
 
+  @org.junit.jupiter.api.Test
+  void flushAsync() throws ExecutionException, InterruptedException {
+    assertTrue(client.flushAsync(randomCollectionName).get().ok());
+  }
+
   @org.junit.jupiter.api.Test
   void compact() {
     List<List<Float>> vectors = generateFloatVectors(size, dimension);
@@ -686,4 +717,50 @@ class MilvusClientTest {
 
     assertTrue(currentSegmentSize < previousSegmentSize);
   }
+
+  @org.junit.jupiter.api.Test
+  void compactAsync() throws ExecutionException, InterruptedException {
+    List<List<Float>> vectors = generateFloatVectors(size, dimension);
+    InsertParam insertParam =
+        new InsertParam.Builder(randomCollectionName).withFloatVectors(vectors).build();
+    InsertResponse insertResponse = client.insert(insertParam);
+    assertTrue(insertResponse.ok());
+    List<Long> vectorIds = insertResponse.getVectorIds();
+    assertEquals(size, vectorIds.size());
+
+    assertTrue(client.flush(randomCollectionName).ok());
+
+    ShowCollectionInfoResponse showCollectionInfoResponse =
+        client.showCollectionInfo(randomCollectionName);
+    assertTrue(showCollectionInfoResponse.getCollectionInfo().isPresent());
+    CollectionInfo.PartitionInfo.SegmentInfo segmentInfo =
+        showCollectionInfoResponse
+            .getCollectionInfo()
+            .get()
+            .getPartitionInfos()
+            .get(0)
+            .getSegmentInfos()
+            .get(0);
+    long previousSegmentSize = segmentInfo.getDataSize();
+
+    assertTrue(client.deleteByIds(randomCollectionName, vectorIds.subList(0, 100)).ok());
+
+    assertTrue(client.flush(randomCollectionName).ok());
+
+    assertTrue(client.compactAsync(randomCollectionName).get().ok());
+
+    showCollectionInfoResponse = client.showCollectionInfo(randomCollectionName);
+    assertTrue(showCollectionInfoResponse.getCollectionInfo().isPresent());
+    segmentInfo =
+        showCollectionInfoResponse
+            .getCollectionInfo()
+            .get()
+            .getPartitionInfos()
+            .get(0)
+            .getSegmentInfos()
+            .get(0);
+    long currentSegmentSize = segmentInfo.getDataSize();
+
+    assertTrue(currentSegmentSize < previousSegmentSize);
+  }
 }