Browse Source

Implement getReplicas() interface (#289)

Signed-off-by: groot <yihua.mo@zilliz.com>
groot 3 years ago
parent
commit
16d83106b0

+ 41 - 2
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -1296,7 +1296,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                     .withCollectionName(requestParam.getCollectionName())
                     .withCollectionName(requestParam.getCollectionName())
                     .build());
                     .build());
             if (descResp.getStatus() != R.Status.Success.getCode()) {
             if (descResp.getStatus() != R.Status.Success.getCode()) {
-                logInfo("Failed to describe collection: {}", requestParam.getCollectionName());
+                logError("Failed to describe collection: {}", requestParam.getCollectionName());
                 return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
                 return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
             }
             }
 
 
@@ -1718,6 +1718,45 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
     }
     }
 
 
+    @Override
+    public R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam) {
+        if (!clientIsReady()) {
+            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+        }
+
+        logInfo(requestParam.toString());
+
+        try {
+            R<DescribeCollectionResponse> descResp = describeCollection(DescribeCollectionParam.newBuilder()
+                    .withCollectionName(requestParam.getCollectionName())
+                    .build());
+            if (descResp.getStatus() != R.Status.Success.getCode()) {
+                logError("Failed to describe collection: {}", requestParam.getCollectionName());
+                return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
+            }
+
+            GetReplicasRequest getReplicasRequest = GetReplicasRequest.newBuilder()
+                    .setCollectionID(descResp.getData().getCollectionID())
+                    .setWithShardNodes(requestParam.isWithShardNodes())
+                    .build();
+
+            GetReplicasResponse response = blockingStub().getReplicas(getReplicasRequest);
+
+            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
+                logInfo("GetReplicasRequest successfully!");
+                return R.success(response);
+            } else {
+                return failedStatus("GetReplicasRequest", response.getStatus());
+            }
+        } catch (StatusRuntimeException e) {
+            logError("GetReplicasRequest RPC failed:\n{}", e.getStatus().toString());
+            return R.failed(e);
+        } catch (Exception e) {
+            logError("GetReplicasRequest failed:\n{}", e.getMessage());
+            return R.failed(e);
+        }
+    }
+
     @Override
     @Override
     public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
     public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
         if (!clientIsReady()) {
         if (!clientIsReady()) {
@@ -1793,7 +1832,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                     .withCollectionName(requestParam.getCollectionName())
                     .withCollectionName(requestParam.getCollectionName())
                     .build());
                     .build());
             if (descResp.getStatus() != R.Status.Success.getCode()) {
             if (descResp.getStatus() != R.Status.Success.getCode()) {
-                logInfo("ManualCompactionRequest successfully!");
+                logError("Failed to describe collection: {}", requestParam.getCollectionName());
                 return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
                 return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
             }
             }
 
 

+ 8 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -385,6 +385,14 @@ public interface MilvusClient {
      */
      */
     R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoParam requestParam);
     R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoParam requestParam);
 
 
+    /**
+     * Returns the collection's replica information
+     *
+     * @param requestParam {@link GetReplicasParam}
+     * @return {status:result code, data:GetReplicasResponse{status,info}}
+     */
+    R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam);
+
     /**
     /**
      * Moves segment from a query node to another to keep the load balanced.
      * Moves segment from a query node to another to keep the load balanced.
      *
      *

+ 5 - 0
src/main/java/io/milvus/client/MilvusMultiServiceClient.java

@@ -347,6 +347,11 @@ public class MilvusMultiServiceClient implements MilvusClient {
         return this.clusterFactory.getMaster().getClient().getQuerySegmentInfo(requestParam);
         return this.clusterFactory.getMaster().getClient().getQuerySegmentInfo(requestParam);
     }
     }
 
 
+    @Override
+    public R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getReplicas(requestParam);
+    }
+
     @Override
     @Override
     public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
     public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
         List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
         List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()

+ 68 - 0
src/main/java/io/milvus/param/control/GetReplicasParam.java

@@ -0,0 +1,68 @@
+package io.milvus.param.control;
+
+import io.milvus.exception.ParamException;
+import io.milvus.param.ParamUtils;
+import lombok.Getter;
+import lombok.NonNull;
+
+/**
+ * Parameters for <code>getReplicas</code> interface.
+ */
+@Getter
+public class GetReplicasParam {
+    private final String collectionName;
+    private boolean withShardNodes;
+
+    private GetReplicasParam(@NonNull Builder builder) {
+        this.collectionName = builder.collectionName;
+        this.withShardNodes = true;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link GetReplicasParam} class.
+     */
+    public static final class Builder {
+        private String collectionName;
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the collection name. Collection name cannot be empty or null.
+         *
+         * @param collectionName collection name
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionName(@NonNull String collectionName) {
+            this.collectionName = collectionName;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link GetReplicasParam} instance.
+         *
+         * @return {@link GetReplicasParam}
+         */
+        public GetReplicasParam build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(collectionName, "Collection name");
+
+            return new GetReplicasParam(this);
+        }
+    }
+
+    /**
+     * Constructs a <code>String</code> by {@link GetReplicasParam} instance.
+     *
+     * @return <code>String</code>
+     */
+    @Override
+    public String toString() {
+        return "GetReplicasParam{" +
+                "collectionName='" + collectionName + '\'' +
+                '}';
+    }
+}

+ 19 - 0
src/test/java/io/milvus/client/MilvusServiceClientTest.java

@@ -2057,6 +2057,25 @@ class MilvusServiceClientTest {
         testFuncByName("getQuerySegmentInfo", param);
         testFuncByName("getQuerySegmentInfo", param);
     }
     }
 
 
+    @Test
+    void getReplicasParam() {
+        // test throw exception with illegal input
+        assertThrows(ParamException.class, () -> GetQuerySegmentInfoParam
+                .newBuilder()
+                .withCollectionName("")
+                .build()
+        );
+    }
+
+    @Test
+    void getReplicas() {
+        GetReplicasParam param = GetReplicasParam.newBuilder()
+                .withCollectionName("collection1")
+                .build();
+
+        testFuncByName("getReplicas", param);
+    }
+
     @Test
     @Test
     void loadBalanceParam() {
     void loadBalanceParam() {
         // test throw exception with illegal input
         // test throw exception with illegal input

+ 14 - 0
src/test/java/io/milvus/server/MockMilvusServerImpl.java

@@ -60,6 +60,7 @@ public class MockMilvusServerImpl extends MilvusServiceGrpc.MilvusServiceImplBas
     private io.milvus.grpc.GetFlushStateResponse respGetFlushState;
     private io.milvus.grpc.GetFlushStateResponse respGetFlushState;
     private io.milvus.grpc.GetPersistentSegmentInfoResponse respGetPersistentSegmentInfo;
     private io.milvus.grpc.GetPersistentSegmentInfoResponse respGetPersistentSegmentInfo;
     private io.milvus.grpc.GetQuerySegmentInfoResponse respGetQuerySegmentInfo;
     private io.milvus.grpc.GetQuerySegmentInfoResponse respGetQuerySegmentInfo;
+    private io.milvus.grpc.GetReplicasResponse respGetReplicas;
     private io.milvus.grpc.GetMetricsResponse respGetMetrics;
     private io.milvus.grpc.GetMetricsResponse respGetMetrics;
 
 
     private io.milvus.grpc.Status respLoadBalance;
     private io.milvus.grpc.Status respLoadBalance;
@@ -513,6 +514,19 @@ public class MockMilvusServerImpl extends MilvusServiceGrpc.MilvusServiceImplBas
         respGetQuerySegmentInfo = resp;
         respGetQuerySegmentInfo = resp;
     }
     }
 
 
+    @Override
+    public void getReplicas(io.milvus.grpc.GetReplicasRequest request,
+                                    io.grpc.stub.StreamObserver<io.milvus.grpc.GetReplicasResponse> responseObserver) {
+        logger.info("getReplicas() call");
+
+        responseObserver.onNext(respGetReplicas);
+        responseObserver.onCompleted();
+    }
+
+    public void setGetReplicasResponse(io.milvus.grpc.GetReplicasResponse resp) {
+        respGetReplicas = resp;
+    }
+
     @Override
     @Override
     public void getMetrics(io.milvus.grpc.GetMetricsRequest request,
     public void getMetrics(io.milvus.grpc.GetMetricsRequest request,
                            io.grpc.stub.StreamObserver<io.milvus.grpc.GetMetricsResponse> responseObserver) {
                            io.grpc.stub.StreamObserver<io.milvus.grpc.GetMetricsResponse> responseObserver) {