Browse Source

Implement getFlushState() (#243)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 3 years ago
parent
commit
3963104639

+ 1 - 1
examples/main/io/milvus/GeneralExample.java

@@ -482,7 +482,7 @@ public class GeneralExample {
         GeneralExample example = new GeneralExample();
 
         example.dropCollection();
-        example.createCollection(10);
+        example.createCollection(2000);
         example.hasCollection();
         example.describeCollection();
         example.showCollections();

+ 41 - 20
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -21,7 +21,6 @@ package io.milvus.client;
 
 import com.google.protobuf.ByteString;
 import io.grpc.StatusRuntimeException;
-import io.milvus.Response.DescCollResponseWrapper;
 import io.milvus.exception.ClientNotConnectedException;
 import io.milvus.exception.IllegalResponseException;
 import io.milvus.exception.ParamException;
@@ -285,7 +284,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
 
     private void waitForFlush(FlushResponse flushResponse, long waitingInterval, long timeout) {
         // The rpc api flush() return FlushResponse, but the returned segment ids maybe not yet persisted.
-        // This method use getPersistentSegmentInfo() to check segment state.
+        // This method use getFlushState() to check segment state.
         // If all segments state become Flushed, then we say the sync flush action is finished.
         // If waiting time exceed timeout, exist the circle
         long tsBegin = System.currentTimeMillis();
@@ -298,29 +297,19 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                     break;
                 }
 
-                GetPersistentSegmentInfoRequest getSegInfoRequest = GetPersistentSegmentInfoRequest.newBuilder()
-                        .setCollectionName(collectionName)
+                GetFlushStateRequest getFlushStateRequest = GetFlushStateRequest.newBuilder()
+                        .addAllSegmentIDs(segmentIDs.getDataList())
                         .build();
-                GetPersistentSegmentInfoResponse response = blockingStub().getPersistentSegmentInfo(getSegInfoRequest);
-                List<PersistentSegmentInfo> segmentInfoArray = response.getInfosList();
-                int flushedCount = 0;
-                for (int i = 0; i < segmentIDs.getDataCount(); ++i) {
-                    for (PersistentSegmentInfo info : segmentInfoArray) {
-                        if (info.getSegmentID() == segmentIDs.getData(i) && info.getState() == SegmentState.Flushed) {
-                            flushedCount++;
-                            break;
-                        }
-                    }
-                }
-
-                // if all segment of this collection has been flushed, break this circle and check next collection
-                if (flushedCount == segmentIDs.getDataCount()) {
+                GetFlushStateResponse response = blockingStub().getFlushState(getFlushStateRequest);
+                if(response.getFlushed()) {
+                    // if all segment of this collection has been flushed, break this circle and check next collection
+                    String msg = segmentIDs.getDataCount() + " segments of " + collectionName + " has been flushed.";
+                    logInfo(msg);
                     break;
                 }
 
                 try {
-                    String msg = "Waiting flush, interval: " + waitingInterval + "ms. " + flushedCount +
-                            " of " + segmentIDs.getDataCount() + " segments flushed.";
+                    String msg = "Waiting flush for " + collectionName + ", interval: " + waitingInterval + "ms. ";
                     logInfo(msg);
                     TimeUnit.MILLISECONDS.sleep(waitingInterval);
                 } catch (InterruptedException e) {
@@ -1634,6 +1623,38 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
     }
 
+    @Override
+    public R<GetFlushStateResponse> getFlushState(@NonNull GetFlushStateParam requestParam) {
+        if (!clientIsReady()) {
+            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+        }
+
+        logInfo(requestParam.toString());
+
+        try {
+            GetFlushStateRequest getFlushStateRequest = GetFlushStateRequest.newBuilder()
+                    .addAllSegmentIDs(requestParam.getSegmentIDs())
+                    .build();
+
+            GetFlushStateResponse response = blockingStub().getFlushState(getFlushStateRequest);
+
+            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
+                logInfo("GetFlushState successfully!");
+                return R.success(response);
+            } else {
+                logError("GetFlushState failed:\n{}", response.getStatus().getReason());
+                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
+                        response.getStatus().getReason());
+            }
+        } catch (StatusRuntimeException e) {
+            logError("GetFlushState RPC failed:\n{}", e.getStatus().toString());
+            return R.failed(e);
+        } catch (Exception e) {
+            logError("GetFlushState failed:\n{}", e.getMessage());
+            return R.failed(e);
+        }
+    }
+
     @Override
     public R<GetPersistentSegmentInfoResponse> getPersistentSegmentInfo(@NonNull GetPersistentSegmentInfoParam requestParam) {
         if (!clientIsReady()) {

+ 8 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -308,6 +308,14 @@ public interface MilvusClient {
      */
     R<GetMetricsResponse> getMetrics(GetMetricsParam requestParam);
 
+    /**
+     * Get flush state of specified segments.
+     *
+     * @param requestParam {@link GetFlushStateParam}
+     * @return {status:result code, data:GetMetricsResponse{status,metrics}}
+     */
+    R<GetFlushStateResponse> getFlushState(GetFlushStateParam requestParam);
+
     /**
      * Gets the information of persistent segments from data node, including row count,
      * persistence state(growing or flushed), etc.

+ 81 - 0
src/main/java/io/milvus/param/control/GetFlushStateParam.java

@@ -0,0 +1,81 @@
+package io.milvus.param.control;
+
+import io.milvus.exception.ParamException;
+import lombok.Getter;
+import lombok.NonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parameters for <code>getMetric</code> interface.
+ */
+@Getter
+public class GetFlushStateParam {
+    private final List<Long> segmentIDs;
+
+    private GetFlushStateParam(@NonNull Builder builder) {
+        this.segmentIDs = builder.segmentIDs;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for <code>GetFlushStateParam</code> class.
+     */
+    public static final class Builder {
+        private final List<Long> segmentIDs = new ArrayList<>();
+
+        private Builder() {
+        }
+
+        /**
+         * Specify segments
+         *
+         * @param segmentIDs segments id list
+         * @return <code>Builder</code>
+         */
+        public Builder withSegmentIDs(@NonNull List<Long> segmentIDs) {
+            this.segmentIDs.addAll(segmentIDs);
+            return this;
+        }
+
+        /**
+         * Specify a segment
+         *
+         * @param segmentID segment id
+         * @return <code>Builder</code>
+         */
+        public Builder addSegmentID(@NonNull Long segmentID) {
+            this.segmentIDs.add(segmentID);
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new <code>GetFlushStateParam</code> instance.
+         *
+         * @return <code>GetFlushStateParam</code>
+         */
+        public GetFlushStateParam build() throws ParamException {
+            if (segmentIDs.isEmpty()) {
+                throw new ParamException("Segment id array cannot be empty");
+            }
+
+            return new GetFlushStateParam(this);
+        }
+    }
+
+    /**
+     * Constructs a <code>String</code> by <code>GetFlushStateParam</code> instance.
+     *
+     * @return <code>String</code>
+     */
+    @Override
+    public String toString() {
+        return "GetFlushStateParam{" +
+                "segmentIDs=" + segmentIDs.toString() +
+                '}';
+    }
+}

+ 9 - 1
src/main/proto/common.proto

@@ -6,7 +6,6 @@ option java_package = "io.milvus.grpc";
 option java_outer_classname = "CommonProto";
 option java_generate_equals_and_hash = true;
 
-
 enum ErrorCode {
     Success = 0;
     UnexpectedError = 1;
@@ -143,6 +142,7 @@ enum MsgType {
     /* DATA SERVICE */
     SegmentInfo = 600;
     SystemInfo = 601;
+    GetRecoveryInfo = 602;
 
     /* SYSTEM CONTROL */
     TimeTick = 1200;
@@ -185,3 +185,11 @@ enum CompactionState {
   Executing = 1;
   Completed = 2;
 }
+
+enum ConsistencyLevel {
+    Strong = 0;
+    Session = 1; // default in PyMilvus
+    Bounded = 2;
+    Eventually = 3;
+    Customized = 4; // Users pass their own `guarantee_timestamp`.
+}

+ 15 - 1
src/main/proto/milvus.proto

@@ -7,7 +7,6 @@ option java_multiple_files = true;
 option java_package = "io.milvus.grpc";
 option java_outer_classname = "MilvusProto";
 option java_generate_equals_and_hash = true;
-
 package milvus.proto.milvus;
 
 service MilvusService {
@@ -45,6 +44,7 @@ service MilvusService {
   rpc Query(QueryRequest) returns (QueryResults) {}
   rpc CalcDistance(CalcDistanceRequest) returns (CalcDistanceResults) {}
 
+  rpc GetFlushState(GetFlushStateRequest) returns (GetFlushStateResponse) {}
   rpc GetPersistentSegmentInfo(GetPersistentSegmentInfoRequest) returns (GetPersistentSegmentInfoResponse) {}
   rpc GetQuerySegmentInfo(GetQuerySegmentInfoRequest) returns (GetQuerySegmentInfoResponse) {}
 
@@ -96,6 +96,8 @@ message CreateCollectionRequest {
   // Once set, no modification is allowed (Optional)
   // https://github.com/milvus-io/milvus/issues/6690
   int32 shards_num = 5;
+  // The consistency level that the collection used, modification is not supported now.
+  common.ConsistencyLevel consistency_level = 6;
 }
 
 /**
@@ -175,6 +177,8 @@ message DescribeCollectionResponse {
   repeated string aliases = 9;
   // The message ID/posititon when collection is created
   repeated common.KeyDataPair start_positions = 10;
+  // The consistency level that the collection used, modification is not supported now.
+  common.ConsistencyLevel consistency_level = 11;
 }
 
 /**
@@ -406,6 +410,7 @@ message DescribeSegmentResponse {
   int64 indexID = 2;
   int64 buildID = 3;
   bool enable_index = 4;
+  int64 fieldID = 5;
 }
 
 message ShowSegmentsRequest {
@@ -768,6 +773,15 @@ message CompactionMergeInfo {
   int64 target = 2;
 }
 
+message GetFlushStateRequest {
+  repeated int64 segmentIDs = 1;
+}
+
+message GetFlushStateResponse {
+  common.Status status = 1;
+  bool flushed = 2;
+}
+
 service ProxyService {
   rpc RegisterLink(RegisterLinkRequest) returns (RegisterLinkResponse) {}
 }

+ 0 - 1
src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -169,7 +169,6 @@ public class MilvusClientDockerTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void testFloatVectors() {
         String randomCollectionName = generator.generate(10);
 

+ 24 - 11
src/test/java/io/milvus/client/MilvusServiceClientTest.java

@@ -368,11 +368,8 @@ class MilvusServiceClientTest {
         mockServerImpl.setFlushResponse(FlushResponse.newBuilder()
                 .putCollSegIDs(collectionName, LongArray.newBuilder().addData(segmentID).build())
                 .build());
-        mockServerImpl.setGetPersistentSegmentInfoResponse(GetPersistentSegmentInfoResponse.newBuilder()
-                .addInfos(PersistentSegmentInfo.newBuilder()
-                        .setSegmentID(segmentID)
-                        .setState(SegmentState.Flushing)
-                        .build())
+        mockServerImpl.setGetFlushStateResponse(GetFlushStateResponse.newBuilder()
+                .setFlushed(false)
                 .build());
 
         new Thread(() -> {
@@ -381,13 +378,10 @@ class MilvusServiceClientTest {
             } catch (InterruptedException e) {
                 System.out.println(e.toString());
             }
-            mockServerImpl.setGetPersistentSegmentInfoResponse(GetPersistentSegmentInfoResponse.newBuilder()
-                    .addInfos(PersistentSegmentInfo.newBuilder()
-                            .setSegmentID(segmentID)
-                            .setState(SegmentState.Flushed)
-                            .build())
+            mockServerImpl.setGetFlushStateResponse(GetFlushStateResponse.newBuilder()
+                    .setFlushed(true)
                     .build());
-        },"RefreshMemState").start();
+        },"RefreshFlushState").start();
 
         R<GetCollectionStatisticsResponse> resp = client.getCollectionStatistics(param);
         assertEquals(R.Status.Success.getCode(), resp.getStatus());
@@ -1774,6 +1768,25 @@ class MilvusServiceClientTest {
         testFuncByName("getMetrics", param);
     }
 
+    @Test
+    void getFlushStateParam() {
+        // test throw exception with illegal input
+        assertThrows(ParamException.class, () -> GetFlushStateParam.newBuilder()
+                .build()
+        );
+    }
+
+    @Test
+    void getFlushState() {
+        List<Long> ids = Arrays.asList(1L, 2L);
+        GetFlushStateParam param = GetFlushStateParam.newBuilder()
+                .addSegmentID(1L)
+                .withSegmentIDs(ids)
+                .build();
+
+        testFuncByName("getFlushState", param);
+    }
+
     @Test
     void getPersistentSegmentInfoParam() {
         // test throw exception with illegal input

+ 14 - 0
src/test/java/io/milvus/server/MockMilvusServerImpl.java

@@ -54,6 +54,7 @@ public class MockMilvusServerImpl extends MilvusServiceGrpc.MilvusServiceImplBas
     private io.milvus.grpc.FlushResponse respFlush;
     private io.milvus.grpc.QueryResults respQuery;
     private io.milvus.grpc.CalcDistanceResults respCalcDistance;
+    private io.milvus.grpc.GetFlushStateResponse respGetFlushState;
     private io.milvus.grpc.GetPersistentSegmentInfoResponse respGetPersistentSegmentInfo;
     private io.milvus.grpc.GetQuerySegmentInfoResponse respGetQuerySegmentInfo;
     private io.milvus.grpc.GetMetricsResponse respGetMetrics;
@@ -443,6 +444,19 @@ public class MockMilvusServerImpl extends MilvusServiceGrpc.MilvusServiceImplBas
         respCalcDistance = resp;
     }
 
+    @Override
+    public void getFlushState(io.milvus.grpc.GetFlushStateRequest request,
+                                         io.grpc.stub.StreamObserver<io.milvus.grpc.GetFlushStateResponse> responseObserver) {
+        logger.info("getFlushState() call");
+
+        responseObserver.onNext(respGetFlushState);
+        responseObserver.onCompleted();
+    }
+
+    public void setGetFlushStateResponse(io.milvus.grpc.GetFlushStateResponse resp) {
+        respGetFlushState = resp;
+    }
+
     @Override
     public void getPersistentSegmentInfo(io.milvus.grpc.GetPersistentSegmentInfoRequest request,
                                          io.grpc.stub.StreamObserver<io.milvus.grpc.GetPersistentSegmentInfoResponse> responseObserver) {