Browse Source

Add RefreshLoad/GetPartitionStats interface (#1244)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 4 months ago
parent
commit
f8b26cb9b8

+ 10 - 3
examples/main/java/io/milvus/v2/BulkWriterExample.java

@@ -612,7 +612,7 @@ public class BulkWriterExample {
         } else {
             milvusClient.createCollection(requestCreate);
         }
-//        milvusClient.loadCollection(LoadCollectionReq.builder().collectionName(collectionName).build());
+
         System.out.printf("Collection %s created%n", collectionName);
     }
 
@@ -694,12 +694,19 @@ public class BulkWriterExample {
                 .collectionName(ALL_TYPES_COLLECTION_NAME)
                 .indexParams(indexes)
                 .build());
+
+        milvusClient.loadCollection(LoadCollectionReq.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
+                .build());
     }
 
     private void loadCollection() {
-        System.out.println("Loading Collection...");
+        System.out.println("Refresh load collection...");
         checkMilvusClientIfExist();
-        milvusClient.loadCollection(LoadCollectionReq.builder()
+        // RefreshLoad is a new interface from v2.5.3,
+        // mainly used when there are new segments generated by bulkinsert request,
+        // force the new segments to be loaded into memory.
+        milvusClient.refreshLoad(RefreshLoadReq.builder()
                 .collectionName(ALL_TYPES_COLLECTION_NAME)
                 .build());
         System.out.println("Collection row number: " + getCollectionRowCount());

+ 30 - 9
sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

@@ -42,6 +42,7 @@ import io.milvus.v2.service.index.request.*;
 import io.milvus.v2.service.index.response.*;
 import io.milvus.v2.service.partition.PartitionService;
 import io.milvus.v2.service.partition.request.*;
+import io.milvus.v2.service.partition.response.*;
 import io.milvus.v2.service.rbac.RBACService;
 import io.milvus.v2.service.rbac.request.*;
 import io.milvus.v2.service.rbac.response.*;
@@ -306,7 +307,7 @@ public class MilvusClientV2 {
     }
     /**
      * Alter database with key value pair. (Available from Milvus v2.4.4)
-     * Deprecated, replaced by alterDatabaseProperties from v2.5.2, to keep consistence with other SDKs
+     * Deprecated, replaced by alterDatabaseProperties from v2.5.3, to keep consistence with other SDKs
      * @param request alter database request
      */
     @Deprecated
@@ -317,14 +318,14 @@ public class MilvusClientV2 {
                 .build());
     }
     /**
-     * Alter a database's properties (Available from Milvus v2.5.2)
+     * Alter a database's properties (Available from Milvus v2.5.3)
      * @param request alter database properties request
      */
     public void alterDatabaseProperties(AlterDatabasePropertiesReq request) {
         retry(()-> databaseService.alterDatabaseProperties(this.getRpcStub(), request));
     }
     /**
-     * drop a database's properties (Available from Milvus v2.5.2)
+     * drop a database's properties (Available from Milvus v2.5.3)
      * @param request alter database properties request
      */
     public void dropDatabaseProperties(DropDatabasePropertiesReq request) {
@@ -374,7 +375,7 @@ public class MilvusClientV2 {
     }
     /**
      * Alter a collection in Milvus.
-     * Deprecated, replaced by alterCollectionProperties from v2.5.2, to keep consistence with other SDKs
+     * Deprecated, replaced by alterCollectionProperties from v2.5.3, to keep consistence with other SDKs
      *
      * @param request alter collection request
      */
@@ -387,7 +388,7 @@ public class MilvusClientV2 {
                 .build());
     }
     /**
-     * Alter a collection's properties (Available from Milvus v2.5.2).
+     * Alter a collection's properties (Available from Milvus v2.5.3).
      *
      * @param request alter collection properties request
      */
@@ -395,7 +396,7 @@ public class MilvusClientV2 {
         retry(()-> collectionService.alterCollectionProperties(this.getRpcStub(), request));
     }
     /**
-     * drop a collection's properties (Available from Milvus v2.5.2)
+     * drop a collection's properties (Available from Milvus v2.5.3)
      * @param request drop collection properties request
      */
     public void dropCollectionProperties(DropCollectionPropertiesReq request) {
@@ -444,6 +445,16 @@ public class MilvusClientV2 {
     public void loadCollection(LoadCollectionReq request) {
         retry(()-> collectionService.loadCollection(this.getRpcStub(), request));
     }
+    /**
+     * Refresh loads a collection. Mainly used when there are new segments generated by bulkinsert request.
+     * Force the new segments to be loaded into memory.
+     * Note: this interface will ignore the LoadCollectionReq.refresh flag
+     *
+     * @param request refresh load collection request
+     */
+    public void refreshLoad(RefreshLoadReq request) {
+        retry(()-> collectionService.refreshLoad(this.getRpcStub(), request));
+    }
     /**
      * Releases a collection from memory in Milvus.
      *
@@ -481,7 +492,7 @@ public class MilvusClientV2 {
     }
     /**
      * Alter an index in Milvus.
-     * Deprecated, replaced by alterIndexProperties from v2.5.2, to keep consistence with other SDKs
+     * Deprecated, replaced by alterIndexProperties from v2.5.3, to keep consistence with other SDKs
      *
      * @param request alter index request
      */
@@ -495,7 +506,7 @@ public class MilvusClientV2 {
                 .build());
     }
     /**
-     * Alter an index's properties (Available from Milvus v2.5.2)
+     * Alter an index's properties (Available from Milvus v2.5.3)
      *
      * @param request alter index request
      */
@@ -503,7 +514,7 @@ public class MilvusClientV2 {
         retry(()->indexService.alterIndexProperties(this.getRpcStub(), request));
     }
     /**
-     * drop an index's properties (Available from Milvus v2.5.2)
+     * drop an index's properties (Available from Milvus v2.5.3)
      * @param request drop index properties request
      */
     public void dropIndexProperties(DropIndexPropertiesReq request) {
@@ -654,6 +665,16 @@ public class MilvusClientV2 {
         return retry(()->partitionService.listPartitions(this.getRpcStub(), request));
     }
 
+    /**
+     * get a partition stats in Milvus.
+     *
+     * @param request get partition stats request
+     * @return GetPartitionStatsResp
+     */
+    public GetPartitionStatsResp getPartitionStats(GetPartitionStatsReq request) {
+        return retry(()-> partitionService.getPartitionStats(this.getRpcStub(), request));
+    }
+
     /**
      * Loads partitions in a collection in Milvus.
      *

+ 20 - 4
sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java

@@ -262,7 +262,22 @@ public class CollectionService extends BaseService {
         Status status = blockingStub.loadCollection(loadCollectionRequest);
         rpcUtils.handleResponse(title, status);
         if (request.getAsync()) {
-            WaitForLoadCollection(blockingStub, request);
+            WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
+        }
+
+        return null;
+    }
+
+    public Void refreshLoad(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RefreshLoadReq request) {
+        String title = String.format("RefreshLoadRequest collectionName:%s", request.getCollectionName());
+        LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
+                .setCollectionName(request.getCollectionName())
+                .setRefresh(true)
+                .build();
+        Status status = blockingStub.loadCollection(loadCollectionRequest);
+        rpcUtils.handleResponse(title, status);
+        if (request.getAsync()) {
+            WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
         }
 
         return null;
@@ -339,16 +354,17 @@ public class CollectionService extends BaseService {
         }
     }
 
-    private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadCollectionReq request) {
+    private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                                       String collectionName, long timeoutMs) {
         boolean isLoaded = false;
         long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)
 
         while (!isLoaded) {
             // Call the getLoadState method
-            isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(request.getCollectionName()).build());
+            isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(collectionName).build());
             if (!isLoaded) {
                 // Check if timeout is exceeded
-                if (System.currentTimeMillis() - startTime > request.getTimeout()) {
+                if (System.currentTimeMillis() - startTime > timeoutMs) {
                     throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection timeout");
                 }
                 // Wait for a certain period before checking again

+ 34 - 0
sdk-core/src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.collection.request;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class RefreshLoadReq {
+    private String collectionName;
+    @Builder.Default
+    private Boolean async = Boolean.TRUE;
+    @Builder.Default
+    private Long timeout = 60000L;
+}

+ 17 - 3
sdk-core/src/main/java/io/milvus/v2/service/partition/PartitionService.java

@@ -19,11 +19,10 @@
 
 package io.milvus.v2.service.partition;
 
-import io.milvus.grpc.CreatePartitionRequest;
-import io.milvus.grpc.MilvusServiceGrpc;
-import io.milvus.grpc.Status;
+import io.milvus.grpc.*;
 import io.milvus.v2.service.BaseService;
 import io.milvus.v2.service.partition.request.*;
+import io.milvus.v2.service.partition.response.*;
 
 import java.util.List;
 
@@ -79,6 +78,21 @@ public class PartitionService extends BaseService {
         return showPartitionsResponse.getPartitionNamesList();
     }
 
+    public GetPartitionStatsResp getPartitionStats(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, GetPartitionStatsReq request) {
+        String title = String.format("GetCollectionStatisticsRequest collectionName:%s", request.getCollectionName());
+        GetPartitionStatisticsRequest getPartitionStatisticsRequest = GetPartitionStatisticsRequest.newBuilder()
+                .setCollectionName(request.getCollectionName())
+                .setPartitionName(request.getPartitionName())
+                .build();
+        GetPartitionStatisticsResponse response = blockingStub.getPartitionStatistics(getPartitionStatisticsRequest);
+
+        rpcUtils.handleResponse(title, response.getStatus());
+        GetPartitionStatsResp getPartitionStatsResp = GetPartitionStatsResp.builder()
+                .numOfEntities(response.getStatsList().stream().filter(stat -> stat.getKey().equals("row_count")).map(stat -> Long.parseLong(stat.getValue())).findFirst().get())
+                .build();
+        return getPartitionStatsResp;
+    }
+
     public Void loadPartitions(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadPartitionsReq request) {
         String title = String.format("Load partitions %s in collection %s", request.getPartitionNames(), request.getCollectionName());
 

+ 30 - 0
sdk-core/src/main/java/io/milvus/v2/service/partition/request/GetPartitionStatsReq.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.partition.request;
+
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class GetPartitionStatsReq {
+    private String collectionName;
+    private String partitionName;
+}

+ 30 - 0
sdk-core/src/main/java/io/milvus/v2/service/partition/response/GetPartitionStatsResp.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package io.milvus.v2.service.partition.response;
+
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class GetPartitionStatsResp {
+    private Long numOfEntities;
+}

+ 4 - 1
sdk-core/src/main/java/io/milvus/v2/service/utility/UtilityService.java

@@ -21,6 +21,8 @@ package io.milvus.v2.service.utility;
 
 import io.milvus.grpc.*;
 import io.milvus.v2.common.CompactionState;
+import io.milvus.v2.exception.ErrorCode;
+import io.milvus.v2.exception.MilvusClientException;
 import io.milvus.v2.service.BaseService;
 import io.milvus.v2.service.utility.request.*;
 import io.milvus.v2.service.utility.response.*;
@@ -32,7 +34,8 @@ public class UtilityService extends BaseService {
         List<String> collectionNames = request.getCollectionNames();
         String title = String.format("Flush collections %s", collectionNames);
         if (collectionNames.isEmpty()) {
-            return null; // maybe do flushAll in future
+            // consistent with python sdk behavior, throw an error if collection names list is null or empty
+            throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Collection name list can not be null or empty");
         }
 
         FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()