소스 검색

Add RefreshLoad/GetPartitionStats interface (#1245)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 4 달 전
부모
커밋
624cccad90

+ 13 - 8
examples/main/java/io/milvus/v2/BulkWriterExample.java

@@ -540,8 +540,6 @@ public class BulkWriterExample {
                 System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
             }
         }
-
-        System.out.println("Collection row number: " + getCollectionStatistics());
     }
 
     private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
@@ -589,8 +587,6 @@ public class BulkWriterExample {
                 System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, importProgressState, progress);
             }
         }
-
-        System.out.println("Collection row number: " + getCollectionStatistics());
     }
 
     /**
@@ -695,16 +691,25 @@ public class BulkWriterExample {
                 .build());
 
         milvusClient.createIndex(CreateIndexReq.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
                 .indexParams(indexes)
                 .build());
+
+        milvusClient.loadCollection(LoadCollectionReq.builder()
+                .collectionName(ALL_TYPES_COLLECTION_NAME)
+                .build());
     }
 
     private void loadCollection() {
-        System.out.println("Loading Collection...");
+        System.out.println("Refresh load collection...");
         checkMilvusClientIfExist();
-        milvusClient.loadCollection(LoadCollectionReq.builder()
+        // RefreshLoad is a new interface from v2.5.3,
+        // mainly used when there are new segments generated by bulkinsert request,
+        // force the new segments to be loaded into memory.
+        milvusClient.refreshLoad(RefreshLoadReq.builder()
                 .collectionName(ALL_TYPES_COLLECTION_NAME)
                 .build());
+        System.out.println("Collection row number: " + getCollectionRowCount());
     }
 
     private List<QueryResp.QueryResult> query(String expr, List<String> outputFields) {
@@ -719,8 +724,8 @@ public class BulkWriterExample {
         return response.getQueryResults();
     }
 
-    private Long getCollectionStatistics() {
-        System.out.println("========== getCollectionStatistics() ==========");
+    private Long getCollectionRowCount() {
+        System.out.println("========== getCollectionRowCount() ==========");
         checkMilvusClientIfExist();
 
         // Get row count, set ConsistencyLevel.STRONG to sync the data to query node so that data is visible

+ 30 - 9
src/main/java/io/milvus/v2/client/MilvusClientV2.java

@@ -42,6 +42,7 @@ import io.milvus.v2.service.index.request.*;
 import io.milvus.v2.service.index.response.*;
 import io.milvus.v2.service.partition.PartitionService;
 import io.milvus.v2.service.partition.request.*;
+import io.milvus.v2.service.partition.response.*;
 import io.milvus.v2.service.rbac.RBACService;
 import io.milvus.v2.service.rbac.request.*;
 import io.milvus.v2.service.rbac.response.*;
@@ -306,7 +307,7 @@ public class MilvusClientV2 {
     }
     /**
      * Alter database with key value pair. (Available from Milvus v2.4.4)
-     * Deprecated, replaced by alterDatabaseProperties, to keep consistence with other SDKs
+     * Deprecated, replaced by alterDatabaseProperties from v2.4.10, to keep consistence with other SDKs
      * @param request alter database request
      */
     @Deprecated
@@ -317,14 +318,14 @@ public class MilvusClientV2 {
                 .build());
     }
     /**
-     * Alter a database's properties.
+     * Alter a database's properties. (Available from Milvus v2.4.10)
      * @param request alter database properties request
      */
     public void alterDatabaseProperties(AlterDatabasePropertiesReq request) {
         retry(()-> databaseService.alterDatabaseProperties(this.getRpcStub(), request));
     }
     /**
-     * drop a database's properties (Available from Milvus v2.5.2)
+     * drop a database's properties. (Available from Milvus v2.4.10)
      * @param request alter database properties request
      */
     public void dropDatabaseProperties(DropDatabasePropertiesReq request) {
@@ -373,7 +374,7 @@ public class MilvusClientV2 {
     }
     /**
      * Alter a collection in Milvus.
-     * Deprecated, replaced by alterCollectionProperties, to keep consistence with other SDKs
+     * Deprecated, replaced by alterCollectionProperties from v2.4.10, to keep consistence with other SDKs
      *
      * @param request alter collection request
      */
@@ -386,7 +387,7 @@ public class MilvusClientV2 {
                 .build());
     }
     /**
-     * Alter a collection's properties.
+     * Alter a collection's properties. (Available from Milvus v2.4.10)
      *
      * @param request alter collection properties request
      */
@@ -394,7 +395,7 @@ public class MilvusClientV2 {
         retry(()-> collectionService.alterCollectionProperties(this.getRpcStub(), request));
     }
     /**
-     * drop a collection's properties.
+     * drop a collection's properties. (Available from Milvus v2.4.10)
      * @param request drop collection properties request
      */
     public void dropCollectionProperties(DropCollectionPropertiesReq request) {
@@ -443,6 +444,16 @@ public class MilvusClientV2 {
     public void loadCollection(LoadCollectionReq request) {
         retry(()-> collectionService.loadCollection(this.getRpcStub(), request));
     }
+    /**
+     * Refresh loads a collection. Mainly used when there are new segments generated by bulkinsert request.
+     * Force the new segments to be loaded into memory.
+     * Note: this interface will ignore the LoadCollectionReq.refresh flag
+     *
+     * @param request refresh load collection request
+     */
+    public void refreshLoad(RefreshLoadReq request) {
+        retry(()-> collectionService.refreshLoad(this.getRpcStub(), request));
+    }
     /**
      * Releases a collection from memory in Milvus.
      *
@@ -480,7 +491,7 @@ public class MilvusClientV2 {
     }
     /**
      * Alter an index in Milvus.
-     * Deprecated, replaced by alterIndexProperties, to keep consistence with other SDKs
+     * Deprecated, replaced by alterIndexProperties from v2.4.10, to keep consistence with other SDKs
      *
      * @param request alter index request
      */
@@ -494,7 +505,7 @@ public class MilvusClientV2 {
                 .build());
     }
     /**
-     * Alter an index's properties.
+     * Alter an index's properties. (Available from Milvus v2.4.10)
      *
      * @param request alter index request
      */
@@ -502,7 +513,7 @@ public class MilvusClientV2 {
         retry(()->indexService.alterIndexProperties(this.getRpcStub(), request));
     }
     /**
-     * drop an index's properties.
+     * drop an index's properties. (Available from Milvus v2.4.10)
      * @param request drop index properties request
      */
     public void dropIndexProperties(DropIndexPropertiesReq request) {
@@ -653,6 +664,16 @@ public class MilvusClientV2 {
         return retry(()->partitionService.listPartitions(this.getRpcStub(), request));
     }
 
+    /**
+     * get a partition stats in Milvus.
+     *
+     * @param request get partition stats request
+     * @return GetPartitionStatsResp
+     */
+    public GetPartitionStatsResp getPartitionStats(GetPartitionStatsReq request) {
+        return retry(()-> partitionService.getPartitionStats(this.getRpcStub(), request));
+    }
+
     /**
      * Loads partitions in a collection in Milvus.
      *

+ 20 - 4
src/main/java/io/milvus/v2/service/collection/CollectionService.java

@@ -253,7 +253,22 @@ public class CollectionService extends BaseService {
         Status status = blockingStub.loadCollection(loadCollectionRequest);
         rpcUtils.handleResponse(title, status);
         if (request.getAsync()) {
-            WaitForLoadCollection(blockingStub, request);
+            WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
+        }
+
+        return null;
+    }
+
+    public Void refreshLoad(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RefreshLoadReq request) {
+        String title = String.format("RefreshLoadRequest collectionName:%s", request.getCollectionName());
+        LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
+                .setCollectionName(request.getCollectionName())
+                .setRefresh(true)
+                .build();
+        Status status = blockingStub.loadCollection(loadCollectionRequest);
+        rpcUtils.handleResponse(title, status);
+        if (request.getAsync()) {
+            WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
         }
 
         return null;
@@ -330,16 +345,17 @@ public class CollectionService extends BaseService {
         }
     }
 
-    private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadCollectionReq request) {
+    private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                                       String collectionName, long timeoutMs) {
         boolean isLoaded = false;
         long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)
 
         while (!isLoaded) {
             // Call the getLoadState method
-            isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(request.getCollectionName()).build());
+            isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(collectionName).build());
             if (!isLoaded) {
                 // Check if timeout is exceeded
-                if (System.currentTimeMillis() - startTime > request.getTimeout()) {
+                if (System.currentTimeMillis() - startTime > timeoutMs) {
                     throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection timeout");
                 }
                 // Wait for a certain period before checking again

+ 34 - 0
src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.collection.request;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class RefreshLoadReq {
+    private String collectionName;
+    @Builder.Default
+    private Boolean async = Boolean.TRUE;
+    @Builder.Default
+    private Long timeout = 60000L;
+}

+ 17 - 3
src/main/java/io/milvus/v2/service/partition/PartitionService.java

@@ -19,11 +19,10 @@
 
 package io.milvus.v2.service.partition;
 
-import io.milvus.grpc.CreatePartitionRequest;
-import io.milvus.grpc.MilvusServiceGrpc;
-import io.milvus.grpc.Status;
+import io.milvus.grpc.*;
 import io.milvus.v2.service.BaseService;
 import io.milvus.v2.service.partition.request.*;
+import io.milvus.v2.service.partition.response.*;
 
 import java.util.List;
 
@@ -79,6 +78,21 @@ public class PartitionService extends BaseService {
         return showPartitionsResponse.getPartitionNamesList();
     }
 
+    public GetPartitionStatsResp getPartitionStats(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, GetPartitionStatsReq request) {
+        String title = String.format("GetCollectionStatisticsRequest collectionName:%s", request.getCollectionName());
+        GetPartitionStatisticsRequest getPartitionStatisticsRequest = GetPartitionStatisticsRequest.newBuilder()
+                .setCollectionName(request.getCollectionName())
+                .setPartitionName(request.getPartitionName())
+                .build();
+        GetPartitionStatisticsResponse response = blockingStub.getPartitionStatistics(getPartitionStatisticsRequest);
+
+        rpcUtils.handleResponse(title, response.getStatus());
+        GetPartitionStatsResp getPartitionStatsResp = GetPartitionStatsResp.builder()
+                .numOfEntities(response.getStatsList().stream().filter(stat -> stat.getKey().equals("row_count")).map(stat -> Long.parseLong(stat.getValue())).findFirst().get())
+                .build();
+        return getPartitionStatsResp;
+    }
+
     public Void loadPartitions(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadPartitionsReq request) {
         String title = String.format("Load partitions %s in collection %s", request.getPartitionNames(), request.getCollectionName());
 

+ 30 - 0
src/main/java/io/milvus/v2/service/partition/request/GetPartitionStatsReq.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.partition.request;
+
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class GetPartitionStatsReq {
+    private String collectionName;
+    private String partitionName;
+}

+ 30 - 0
src/main/java/io/milvus/v2/service/partition/response/GetPartitionStatsResp.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package io.milvus.v2.service.partition.response;
+
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+@Data
+@SuperBuilder
+public class GetPartitionStatsResp {
+    private Long numOfEntities;
+}

+ 4 - 1
src/main/java/io/milvus/v2/service/utility/UtilityService.java

@@ -21,6 +21,8 @@ package io.milvus.v2.service.utility;
 
 import io.milvus.grpc.*;
 import io.milvus.v2.common.CompactionState;
+import io.milvus.v2.exception.ErrorCode;
+import io.milvus.v2.exception.MilvusClientException;
 import io.milvus.v2.service.BaseService;
 import io.milvus.v2.service.utility.request.*;
 import io.milvus.v2.service.utility.response.*;
@@ -32,7 +34,8 @@ public class UtilityService extends BaseService {
         List<String> collectionNames = request.getCollectionNames();
         String title = String.format("Flush collections %s", collectionNames);
         if (collectionNames.isEmpty()) {
-            return null; // maybe do flushAll in future
+            // consistent with python sdk behavior, throw an error if collection names list is null or empty
+            throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Collection name list can not be null or empty");
         }
 
         FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()