浏览代码

Add FlushAll api (#454)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
yihao.dai 2 年之前
父节点
当前提交
10253aba23

+ 63 - 0
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -241,6 +241,40 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         });
     }
 
+    private void waitForFlushAll(FlushAllResponse flushAllResponse, long waitingInterval, long timeout) {
+        // The rpc api flushAll() returns a FlushAllResponse, but the returned flushAllTs may not yet be persisted.
+        // This method uses getFlushAllState() to check the flushAll state.
+        // If getFlushAllState() returns Flushed, then we can say that the flushAll action is finished.
+        // If the waiting time exceeds the timeout, exit the loop.
+        long tsBegin = System.currentTimeMillis();
+        long flushAllTs = flushAllResponse.getFlushAllTs();
+        while (true) {
+            long tsNow = System.currentTimeMillis();
+            if ((tsNow - tsBegin) >= timeout * 1000) {
+                logWarning("waitForFlushAll timeout");
+                break;
+            }
+
+            GetFlushAllStateRequest getFlushAllStateRequest = GetFlushAllStateRequest.newBuilder()
+                    .setFlushAllTs(flushAllTs)
+                    .build();
+            GetFlushAllStateResponse response = blockingStub().getFlushAllState(getFlushAllStateRequest);
+            if (response.getFlushed()) {
+                logDebug("waitForFlushAll done, all flushed!");
+                break;
+            }
+
+            try {
+                String msg = "waitForFlushAll, interval: " + waitingInterval + "ms.";
+                logDebug(msg);
+                TimeUnit.MILLISECONDS.sleep(waitingInterval);
+            } catch (InterruptedException e) {
+                logWarning("waitForFlushAll interrupted");
+                break;
+            }
+        }
+    }
+
     private R<Boolean> waitForIndex(String collectionName, String indexName, String fieldName,
                                     long waitingInterval, long timeout) {
         // This method use getIndexState() to check index state.
@@ -648,6 +682,35 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
     }
 
+    /**
+     * Flush all collections. All insertions, deletions, and upserts before `flushAll` will be synced.
+     */
+    @Override
+    public R<FlushAllResponse> flushAll(boolean syncFlushAll, long syncFlushAllWaitingInterval, long syncFlushAllTimeout) {
+        if (!clientIsReady()) {
+            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+        }
+
+        logInfo("start flushAll...");
+        try {
+            FlushAllRequest flushAllRequest = FlushAllRequest.newBuilder().build();
+            FlushAllResponse response = blockingStub().flushAll(flushAllRequest);
+
+            if (syncFlushAll) {
+                waitForFlushAll(response, syncFlushAllWaitingInterval, syncFlushAllTimeout);
+            }
+
+            logDebug("flushAll successfully!");
+            return R.success(response);
+        } catch (StatusRuntimeException e) {
+            logError("flushAll RPC failed!");
+            return R.failed(e);
+        } catch (Exception e) {
+            logError("flushAll failed!");
+            return R.failed(e);
+        }
+    }
+
     @Override
     public R<RpcStatus> createPartition(@NonNull CreatePartitionParam requestParam) {
         if (!clientIsReady()) {

+ 10 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -138,6 +138,16 @@ public interface MilvusClient {
      */
     R<FlushResponse> flush(FlushParam requestParam);
 
+    /**
+     * Flush all collections. All insertions, deletions, and upserts before `flushAll` will be synced.
+     *
+     * @param syncFlushAll {flushAll synchronously or asynchronously}
+     * @param syncFlushAllWaitingInterval {wait intervel when flushAll synchronously}
+     * @param syncFlushAllTimeout {timeout when flushAll synchronously}
+     * @return {status:result code,data: FlushAllResponse{flushAllTs}}
+     */
+    R<FlushAllResponse> flushAll(boolean syncFlushAll, long syncFlushAllWaitingInterval, long syncFlushAllTimeout);
+
     /**
      * Creates a partition in the specified collection.
      *

+ 8 - 0
src/main/java/io/milvus/client/MilvusMultiServiceClient.java

@@ -169,6 +169,14 @@ public class MilvusMultiServiceClient implements MilvusClient {
         return handleResponse(response);
     }
 
+    @Override
+    public R<FlushAllResponse> flushAll(boolean syncFlushAll, long syncFlushAllWaitingInterval, long syncFlushAllTimeout) {
+        List<R<FlushAllResponse>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().flushAll(syncFlushAll, syncFlushAllWaitingInterval, syncFlushAllTimeout))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
     @Override
     public R<RpcStatus> createPartition(CreatePartitionParam requestParam) {
         List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()

+ 1 - 1
src/main/milvus-proto

@@ -1 +1 @@
-Subproject commit d8cfcb4d86bd0720258351b4b4641778e30cb62e
+Subproject commit aa8a661302177b80869d7c84b9d3a3679b452043