Browse Source

Add flush/compact/getCompactionState interface for V2 (#1139)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 8 months ago
parent
commit
4593838cbe

+ 39 - 0
src/main/java/io/milvus/v2/client/MilvusClientV2.java

@@ -772,6 +772,45 @@ public class MilvusClientV2 {
         return retry(()->utilityService.describeAlias(this.getRpcStub(), request));
     }
 
+    /**
+     * trigger a flush action in server side
+     *
+     * @param request flush request
+     */
+    public void flush(FlushReq request) {
+        FlushResp response = retry(()->utilityService.flush(this.getRpcStub(), request));
+
+        // The BlockingStub.flush() api returns immediately after the datanode set all growing segments to be "sealed".
+        // The flush state becomes "Completed" after the datanode uploading them to S3 asynchronously.
+        // Here we wait the flush action to be "Completed".
+        MilvusServiceGrpc.MilvusServiceBlockingStub tempBlockingStub =
+                MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
+        if (request.getWaitFlushedTimeoutMs() > 0L) {
+            tempBlockingStub = tempBlockingStub.withDeadlineAfter(request.getWaitFlushedTimeoutMs(), TimeUnit.MILLISECONDS);
+        }
+        utilityService.waitFlush(tempBlockingStub, response.getCollectionSegmentIDs(), response.getCollectionFlushTs());
+    }
+
+    /**
+     * trigger an asynchronous compaction in server side
+     *
+     * @param request compact request
+     * @return CompactResp
+     */
+    public CompactResp compact(CompactReq request) {
+        return retry(()->utilityService.compact(this.getRpcStub(), request));
+    }
+
+    /**
+     * get a compact task state by its ID
+     *
+     * @param request get compact state request
+     * @return GetCompactStateResp
+     */
+    public GetCompactionStateResp getCompactionState(GetCompactionStateReq request) {
+        return retry(()->utilityService.getCompactionState(this.getRpcStub(), request));
+    }
+
     /**
      * Get server version
      *

+ 35 - 0
src/main/java/io/milvus/v2/common/CompactionState.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.common;
+
+import lombok.Getter;
+
+@Getter
+public enum CompactionState {
+    UndefiedState(0),
+    Executing(1),
+    Completed(2);
+
+    private final int code;
+    CompactionState(int code) {
+        this.code = code;
+    }
+    ;
+}

+ 19 - 0
src/main/java/io/milvus/v2/common/IndexBuildState.java

@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package io.milvus.v2.common;
 
 public enum IndexBuildState {

+ 4 - 1
src/main/java/io/milvus/v2/service/collection/response/ListCollectionsResp.java

@@ -19,13 +19,16 @@
 
 package io.milvus.v2.service.collection.response;
 
+import lombok.Builder;
 import lombok.Data;
 import lombok.experimental.SuperBuilder;
 
+import java.util.ArrayList;
 import java.util.List;
 
 @Data
 @SuperBuilder
 public class ListCollectionsResp {
-    private List<String> collectionNames;
+    @Builder.Default
+    private List<String> collectionNames = new ArrayList<>();
 }

+ 86 - 13
src/main/java/io/milvus/v2/service/utility/UtilityService.java

@@ -19,24 +19,97 @@
 
 package io.milvus.v2.service.utility;
 
-import io.milvus.grpc.FlushResponse;
-import io.milvus.grpc.MilvusServiceGrpc;
-import io.milvus.param.R;
-import io.milvus.param.RpcStatus;
+import io.milvus.grpc.*;
+import io.milvus.v2.common.CompactionState;
 import io.milvus.v2.service.BaseService;
 import io.milvus.v2.service.utility.request.*;
-import io.milvus.v2.service.utility.response.DescribeAliasResp;
-import io.milvus.v2.service.utility.response.ListAliasResp;
+import io.milvus.v2.service.utility.response.*;
+
+import java.util.*;
 
 public class UtilityService extends BaseService {
-    public R<RpcStatus> flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, FlushReq request) {
-        String title = String.format("Flush collection %s", request.getCollectionName());
-        io.milvus.grpc.FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()
-                .addCollectionNames(request.getCollectionName())
+    public FlushResp flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, FlushReq request) {
+        List<String> collectionNames = request.getCollectionNames();
+        String title = String.format("Flush collections %s", collectionNames);
+        if (collectionNames.isEmpty()) {
+            return null; // maybe do flushAll in future
+        }
+
+        FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()
+                .addAllCollectionNames(collectionNames)
+                .build();
+        FlushResponse response = blockingStub.flush(flushRequest);
+        rpcUtils.handleResponse(title, response.getStatus());
+
+        Map<String, io.milvus.grpc.LongArray> rpcCollSegIDs = response.getCollSegIDsMap();
+        Map<String, List<Long>> collectionSegmentIDs = new HashMap<>();
+        rpcCollSegIDs.forEach((key, value)->{
+            collectionSegmentIDs.put(key, value.getDataList());
+        });
+        Map<String, Long> collectionFlushTs = response.getCollFlushTsMap();
+        return FlushResp.builder()
+                .collectionSegmentIDs(collectionSegmentIDs)
+                .collectionFlushTs(collectionFlushTs)
+                .build();
+    }
+
+    // this method is internal use, not expose to user
+    public Void waitFlush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                          Map<String, List<Long>> collectionSegmentIDs,
+                          Map<String, Long> collectionFlushTs) {
+        collectionSegmentIDs.forEach((collectionName, segmentIDs)->{
+            if (collectionFlushTs.containsKey(collectionName)) {
+                Long flushTs = collectionFlushTs.get(collectionName);
+                boolean flushed = false;
+                while (!flushed) {
+                    GetFlushStateResponse flushResponse = blockingStub.getFlushState(GetFlushStateRequest.newBuilder()
+                            .addAllSegmentIDs(segmentIDs)
+                            .setFlushTs(flushTs)
+                            .build());
+
+                    flushed = flushResponse.getFlushed();
+                }
+            }
+        });
+
+        return null;
+    }
+
+    public CompactResp compact(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, CompactReq request) {
+        String title = String.format("Compact collection %s", request.getCollectionName());
+
+        DescribeCollectionResponse descResponse = blockingStub.describeCollection(DescribeCollectionRequest.newBuilder()
+                .setCollectionName(request.getCollectionName())
+                .build());
+        rpcUtils.handleResponse(title, descResponse.getStatus());
+
+        io.milvus.grpc.ManualCompactionRequest compactRequest = io.milvus.grpc.ManualCompactionRequest.newBuilder()
+                .setCollectionID(descResponse.getCollectionID())
+                .setMajorCompaction(request.getIsClustering())
+                .build();
+        io.milvus.grpc.ManualCompactionResponse response = blockingStub.manualCompaction(compactRequest);
+        rpcUtils.handleResponse(title, response.getStatus());
+
+        return CompactResp.builder()
+                .compactionID(response.getCompactionID())
+                .build();
+    }
+
+    public GetCompactionStateResp getCompactionState(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                                                     GetCompactionStateReq request) {
+        String title = "Get compaction state";
+        io.milvus.grpc.GetCompactionStateRequest getRequest = io.milvus.grpc.GetCompactionStateRequest.newBuilder()
+                .setCompactionID(request.getCompactionID())
+                .build();
+        io.milvus.grpc.GetCompactionStateResponse response = blockingStub.getCompactionState(getRequest);
+        rpcUtils.handleResponse(title, response.getStatus());
+
+        return GetCompactionStateResp.builder()
+                .state(CompactionState.valueOf(response.getState().name()))
+                .executingPlanNo(response.getExecutingPlanNo())
+                .timeoutPlanNo(response.getTimeoutPlanNo())
+                .completedPlanNo(response.getCompletedPlanNo())
                 .build();
-        FlushResponse status = blockingStub.flush(flushRequest);
-        rpcUtils.handleResponse(title, status.getStatus());
-        return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
     }
 
     public Void createAlias(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, CreateAliasReq request) {

+ 33 - 0
src/main/java/io/milvus/v2/service/utility/request/CompactReq.java

@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.utility.request;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class CompactReq {
+    private String collectionName;
+
+    @Builder.Default
+    private Boolean isClustering = Boolean.FALSE;
+}

+ 9 - 1
src/main/java/io/milvus/v2/service/utility/request/FlushReq.java

@@ -19,11 +19,19 @@
 
 package io.milvus.v2.service.utility.request;
 
+import lombok.Builder;
 import lombok.Data;
 import lombok.experimental.SuperBuilder;
 
+import java.util.ArrayList;
+import java.util.List;
+
 @Data
 @SuperBuilder
 public class FlushReq {
-    private String collectionName;
+    @Builder.Default
+    private List<String> collectionNames = new ArrayList<>();
+
+    @Builder.Default
+    private Long waitFlushedTimeoutMs = 0L; // 0 - waiting util flush task is done
 }

+ 29 - 0
src/main/java/io/milvus/v2/service/utility/request/GetCompactionStateReq.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.utility.request;
+
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class GetCompactionStateReq {
+    private Long compactionID;
+}

+ 31 - 0
src/main/java/io/milvus/v2/service/utility/response/CompactResp.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.utility.response;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class CompactResp {
+    @Builder.Default
+    private Long compactionID = 0L;
+}

+ 36 - 0
src/main/java/io/milvus/v2/service/utility/response/FlushResp.java

@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.utility.response;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+import org.apache.commons.collections.map.HashedMap;
+
+import java.util.*;
+
+@Data
+@SuperBuilder
+public class FlushResp {
+    @Builder.Default
+    Map<String, List<Long>> collectionSegmentIDs = new HashedMap();
+    @Builder.Default
+    Map<String, Long> collectionFlushTs = new HashedMap();
+}

+ 41 - 0
src/main/java/io/milvus/v2/service/utility/response/GetCompactionStateResp.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.utility.response;
+
+import io.milvus.v2.common.CompactionState;
+import lombok.Builder;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class GetCompactionStateResp {
+    @Builder.Default
+    private CompactionState state = CompactionState.UndefiedState;
+
+    @Builder.Default
+    private Long executingPlanNo = 0L;
+
+    @Builder.Default
+    private Long timeoutPlanNo = 0L;
+
+    @Builder.Default
+    private Long completedPlanNo = 0L;
+}

+ 1 - 2
src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -35,7 +35,6 @@ import io.milvus.v2.service.index.IndexService;
 import io.milvus.v2.service.vector.request.*;
 import io.milvus.v2.service.vector.response.*;
 import io.milvus.v2.utils.DataUtils;
-import io.milvus.v2.utils.RpcUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +70,7 @@ public class VectorService extends BaseService {
             }
             DescribeCollectionRequest describeCollectionRequest = builder.build();
             DescribeCollectionResponse response = blockingStub.describeCollection(describeCollectionRequest);
-            new RpcUtils().handleResponse(msg, response.getStatus());
+            rpcUtils.handleResponse(msg, response.getStatus());
             info = response;
             cacheCollectionInfo.put(key, info);
         }

+ 19 - 16
src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

@@ -36,23 +36,14 @@ import io.milvus.v2.common.DataType;
 import io.milvus.v2.common.IndexParam;
 import io.milvus.v2.exception.MilvusClientException;
 import io.milvus.v2.service.collection.request.*;
-import io.milvus.v2.service.collection.response.DescribeCollectionResp;
-import io.milvus.v2.service.collection.response.ListCollectionsResp;
-import io.milvus.v2.service.database.request.AlterDatabaseReq;
-import io.milvus.v2.service.database.request.CreateDatabaseReq;
-import io.milvus.v2.service.database.request.DescribeDatabaseReq;
-import io.milvus.v2.service.database.request.DropDatabaseReq;
-import io.milvus.v2.service.database.response.DescribeDatabaseResp;
-import io.milvus.v2.service.database.response.ListDatabasesResp;
-import io.milvus.v2.service.index.request.AlterIndexReq;
-import io.milvus.v2.service.index.request.CreateIndexReq;
-import io.milvus.v2.service.index.request.DescribeIndexReq;
-import io.milvus.v2.service.index.request.DropIndexReq;
-import io.milvus.v2.service.index.response.DescribeIndexResp;
+import io.milvus.v2.service.collection.response.*;
+import io.milvus.v2.service.database.request.*;
+import io.milvus.v2.service.database.response.*;
+import io.milvus.v2.service.index.request.*;
+import io.milvus.v2.service.index.response.*;
 import io.milvus.v2.service.partition.request.*;
-import io.milvus.v2.service.utility.request.AlterAliasReq;
-import io.milvus.v2.service.utility.request.CreateAliasReq;
-import io.milvus.v2.service.utility.request.DropAliasReq;
+import io.milvus.v2.service.utility.request.*;
+import io.milvus.v2.service.utility.response.*;
 import io.milvus.v2.service.vector.request.*;
 import io.milvus.v2.service.vector.request.data.*;
 import io.milvus.v2.service.vector.request.ranker.*;
@@ -456,6 +447,18 @@ class MilvusClientV2DockerTest {
                 .build());
         Assertions.assertEquals(count, insertResp.getInsertCnt());
 
+        // flush
+        client.flush(FlushReq.builder()
+                .collectionNames(Collections.singletonList(randomCollectionName))
+                .build());
+
+        // compact
+        CompactResp compactResp = client.compact(CompactReq.builder()
+                .collectionName(randomCollectionName)
+                .build());
+        // there is a segment is flushed by the flush() interface, there could be a compaction task created
+        Assertions.assertTrue(compactResp.getCompactionID() == -1L || compactResp.getCompactionID() > 0L);
+
         // create partition, upsert one row to the partition
         String partitionName = "PPP";
         client.createPartition(CreatePartitionReq.builder()