Browse Source

Support CDC api (#1620)

Signed-off-by: huanghaoyuanhhy <haoyuan.huang@zilliz.com>
huanghaoyuanhhy 2 weeks ago
parent
commit
82df23b8b6

+ 88 - 0
examples/src/main/java/io/milvus/v2/CDCExample.java

@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2;
+
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.cdc.request.CrossClusterTopology;
+import io.milvus.v2.service.cdc.request.MilvusCluster;
+import io.milvus.v2.service.cdc.request.ReplicateConfiguration;
+import io.milvus.v2.service.cdc.request.UpdateReplicateConfigurationReq;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CDCExample {
+    private static final String clusterAURI = "http://192.168.1.1:19530";
+    private static final String clusterBURI = "http://192.168.1.1:19500";
+
+    private static final String clusterAId = "cdc-test-upstream";
+    private static final String clusterBId = "cdc-test-downstream";
+
+    private static final Integer pchannelNum = 16;
+
+    private static List<String> generatePChannels(String clusterId) {
+        List<String> pchannels = new ArrayList<>(pchannelNum);
+        for (int i = 0; i < pchannelNum; i++) {
+            pchannels.add(clusterId + "-rootcoord-dml_" + i);
+        }
+        return pchannels;
+    }
+
+    public static void main(String[] args) {
+        ConnectConfig clusterA = ConnectConfig.builder()
+                .uri(clusterAURI)
+                .build();
+        MilvusClientV2 clusterAClient = new MilvusClientV2(clusterA);
+
+        ConnectConfig clusterB = ConnectConfig.builder()
+                .uri(clusterBURI)
+                .build();
+        MilvusClientV2 clusterBClient = new MilvusClientV2(clusterB);
+
+        MilvusCluster milvusClusterA = MilvusCluster.builder()
+                .clusterId(clusterAId)
+                .uri(clusterAURI)
+                .pchannels(generatePChannels(clusterAId))
+                .build();
+        MilvusCluster milvusClusterB = MilvusCluster.builder()
+                .clusterId(clusterBId)
+                .uri(clusterBURI)
+                .pchannels(generatePChannels(clusterBId))
+                .build();
+
+        CrossClusterTopology topology = CrossClusterTopology.builder()
+                .sourceClusterId(clusterAId)
+                .targetClusterId(clusterBId)
+                .build();
+
+        ReplicateConfiguration configuration = ReplicateConfiguration.builder()
+                .clusters(new ArrayList<MilvusCluster>(){{ add(milvusClusterA); add(milvusClusterB); }})
+                .crossClusterTopologies(new ArrayList<CrossClusterTopology>(){{ add(topology); }} )
+                .build();
+
+        UpdateReplicateConfigurationReq updateReq = UpdateReplicateConfigurationReq.builder()
+                .replicateConfiguration(configuration)
+                .build();
+
+        clusterAClient.updateReplicateConfiguration(updateReq);
+        clusterBClient.updateReplicateConfiguration(updateReq);
+    }
+}

+ 8 - 0
sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

@@ -25,6 +25,9 @@ import io.milvus.orm.iterator.QueryIterator;
 import io.milvus.orm.iterator.SearchIterator;
 import io.milvus.orm.iterator.SearchIteratorV2;
 
+import io.milvus.v2.service.cdc.CDCService;
+import io.milvus.v2.service.cdc.request.*;
+import io.milvus.v2.service.cdc.response.*;
 import io.milvus.v2.service.database.DatabaseService;
 import io.milvus.v2.service.database.request.*;
 import io.milvus.v2.service.database.response.*;
@@ -74,6 +77,7 @@ public class MilvusClientV2 {
     private final RBACService rbacService = new RBACService();
     private final ResourceGroupService rgroupService = new ResourceGroupService();
     private final UtilityService utilityService = new UtilityService();
+    private final CDCService cdcService = new CDCService();
     private RpcUtils rpcUtils = new RpcUtils();
     private ConnectConfig connectConfig;
 
@@ -1053,6 +1057,10 @@ public class MilvusClientV2 {
         return rpcUtils.retry(()->utilityService.checkHealth(this.getRpcStub()));
     }
 
+    public UpdateReplicateConfigurationResp updateReplicateConfiguration(UpdateReplicateConfigurationReq request) {
+        return rpcUtils.retry(()->cdcService.updateReplicateConfiguration(this.getRpcStub(), request));
+    }
+
     /**
      * Disconnects from a Milvus server with configurable timeout
      *

+ 41 - 0
sdk-core/src/main/java/io/milvus/v2/service/cdc/CDCService.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.cdc;
+
+import io.milvus.grpc.MilvusServiceGrpc;
+import io.milvus.grpc.Status;
+import io.milvus.grpc.UpdateReplicateConfigurationRequest;
+import io.milvus.v2.service.BaseService;
+import io.milvus.v2.service.cdc.request.UpdateReplicateConfigurationReq;
+import io.milvus.v2.service.cdc.response.UpdateReplicateConfigurationResp;
+
+public class CDCService extends BaseService {
+    public UpdateReplicateConfigurationResp updateReplicateConfiguration(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpdateReplicateConfigurationReq requestParam) {
+        UpdateReplicateConfigurationRequest request = UpdateReplicateConfigurationRequest.newBuilder()
+                .setReplicateConfiguration(requestParam.getReplicateConfiguration().toGRPC())
+                .build();
+
+        String title = "UpdateReplicateConfiguration";
+
+        Status response = blockingStub.updateReplicateConfiguration(request);
+        rpcUtils.handleResponse(title, response);
+        return UpdateReplicateConfigurationResp.builder().build();
+    }
+}

+ 100 - 0
sdk-core/src/main/java/io/milvus/v2/service/cdc/request/CrossClusterTopology.java

@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.cdc.request;
+
+
+import java.util.Objects;
+
+
+public class CrossClusterTopology {
+    private String sourceClusterId;
+    private String targetClusterId;
+
+    public io.milvus.grpc.CrossClusterTopology toGRPC() {
+        return io.milvus.grpc.CrossClusterTopology.newBuilder()
+                .setSourceClusterId(this.sourceClusterId)
+                .setTargetClusterId(this.targetClusterId)
+                .build();
+    }
+
+    public String getSourceClusterId() {
+        return sourceClusterId;
+    }
+
+    public void setSourceClusterId(String sourceClusterId) {
+        this.sourceClusterId = sourceClusterId;
+    }
+
+    public String getTargetClusterId() {
+        return targetClusterId;
+    }
+
+    public void setTargetClusterId(String targetClusterId) {
+        this.targetClusterId = targetClusterId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        CrossClusterTopology topology = (CrossClusterTopology) o;
+        return Objects.equals(sourceClusterId, topology.sourceClusterId) && Objects.equals(targetClusterId, topology.targetClusterId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sourceClusterId, targetClusterId);
+    }
+
+    @Override
+    public String toString() {
+        return "CrossClusterTopology{" +
+                "sourceClusterId='" + sourceClusterId + '\'' +
+                ", targetClusterId='" + targetClusterId + '\'' +
+                '}';
+    }
+
+    private CrossClusterTopology(Builder builder) {
+        this.sourceClusterId = builder.sourceClusterId;
+        this.targetClusterId = builder.targetClusterId;
+    }
+
+    public static class Builder {
+        private String sourceClusterId;
+        private String targetClusterId;
+
+        public Builder sourceClusterId(String sourceClusterId) {
+            this.sourceClusterId = sourceClusterId;
+            return this;
+        }
+
+        public Builder targetClusterId(String targetClusterId) {
+            this.targetClusterId = targetClusterId;
+            return this;
+        }
+
+        public CrossClusterTopology build() {
+            return new CrossClusterTopology(this);
+        }
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+}

+ 142 - 0
sdk-core/src/main/java/io/milvus/v2/service/cdc/request/MilvusCluster.java

@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.cdc.request;
+
+import java.util.List;
+import java.util.Objects;
+
+public class MilvusCluster {
+    private String clusterId;
+    private String uri;
+    private String token;
+    private List<String> pchannels;
+
+    public io.milvus.grpc.MilvusCluster toGRPC() {
+        io.milvus.grpc.ConnectionParam.Builder connectionParamBuilder = io.milvus.grpc.ConnectionParam.newBuilder()
+                .setUri(this.uri);
+        if (this.token != null) {
+            connectionParamBuilder.setToken(this.token);
+        }
+
+        io.milvus.grpc.MilvusCluster.Builder builder = io.milvus.grpc.MilvusCluster.newBuilder()
+                .setClusterId(this.clusterId)
+                .setConnectionParam(connectionParamBuilder);
+        if (this.pchannels != null) {
+            builder.addAllPchannels(this.pchannels);
+        }
+        return builder.build();
+    }
+
+    private MilvusCluster(Builder builder) {
+        this.clusterId = builder.clusterId;
+        this.uri = builder.uri;
+        this.token = builder.token;
+        this.pchannels = builder.pchannels;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public String getToken() {
+        return token;
+    }
+
+    public void setToken(String token) {
+        this.token = token;
+    }
+
+    public List<String> getPchannels() {
+        return pchannels;
+    }
+
+    public void setPchannels(List<String> pchannels) {
+        this.pchannels = pchannels;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        MilvusCluster that = (MilvusCluster) o;
+        return Objects.equals(clusterId, that.clusterId) && Objects.equals(uri, that.uri) && Objects.equals(token, that.token) && Objects.equals(pchannels, that.pchannels);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clusterId, uri, token, pchannels);
+    }
+
+    @Override
+    public String toString() {
+        return "MilvusCluster{" +
+                "clusterId='" + clusterId + '\'' +
+                ", uri='" + uri + '\'' +
+                ", token='" + token + '\'' +
+                ", pchannels=" + pchannels +
+                '}';
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String clusterId;
+        private String uri;
+        private String token;
+        private List<String> pchannels;
+
+        public Builder clusterId(String clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+
+        public Builder uri(String uri) {
+            this.uri = uri;
+            return this;
+        }
+
+        public Builder token(String token) {
+            this.token = token;
+            return this;
+        }
+
+        public Builder pchannels(List<String> pchannels) {
+            this.pchannels = pchannels;
+            return this;
+        }
+
+        public MilvusCluster build() {
+            return new MilvusCluster(this);
+        }
+    }
+}

+ 109 - 0
sdk-core/src/main/java/io/milvus/v2/service/cdc/request/ReplicateConfiguration.java

@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.cdc.request;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ReplicateConfiguration {
+    private List<MilvusCluster> clusters;
+    private List<CrossClusterTopology> crossClusterTopologies;
+
+    public io.milvus.grpc.ReplicateConfiguration toGRPC() {
+        io.milvus.grpc.ReplicateConfiguration.Builder builder = io.milvus.grpc.ReplicateConfiguration.newBuilder();
+        if (this.clusters != null) {
+            for (MilvusCluster cluster : this.clusters) {
+                builder.addClusters(cluster.toGRPC());
+            }
+        }
+
+        if (this.crossClusterTopologies != null) {
+            for (CrossClusterTopology topology : this.crossClusterTopologies) {
+                builder.addCrossClusterTopology(topology.toGRPC());
+            }
+        }
+
+        return builder.build();
+    }
+
+    private ReplicateConfiguration(Builder builder) {
+        this.clusters = builder.clusters;
+        this.crossClusterTopologies = builder.crossClusterTopologies;
+    }
+
+    public List<MilvusCluster> getClusters() {
+        return clusters;
+    }
+
+    public void setClusters(List<MilvusCluster> clusters) {
+        this.clusters = clusters;
+    }
+
+    public List<CrossClusterTopology> getCrossClusterTopologies() {
+        return crossClusterTopologies;
+    }
+
+    public void setCrossClusterTopologies(List<CrossClusterTopology> crossClusterTopologies) {
+        this.crossClusterTopologies = crossClusterTopologies;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        ReplicateConfiguration that = (ReplicateConfiguration) o;
+        return Objects.equals(clusters, that.clusters) && Objects.equals(crossClusterTopologies, that.crossClusterTopologies);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clusters, crossClusterTopologies);
+    }
+
+    @Override
+    public String toString() {
+        return "ReplicateConfiguration{" +
+                "clusters=" + clusters +
+                ", crossClusterTopologies=" + crossClusterTopologies +
+                '}';
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private List<MilvusCluster> clusters;
+        private List<CrossClusterTopology> crossClusterTopologies;
+
+        public Builder clusters(List<MilvusCluster> clusters) {
+            this.clusters = clusters;
+            return this;
+        }
+
+        public Builder crossClusterTopologies(List<CrossClusterTopology> crossClusterTopologies) {
+            this.crossClusterTopologies = crossClusterTopologies;
+            return this;
+        }
+
+        public ReplicateConfiguration build() {
+            return new ReplicateConfiguration(this);
+        }
+    }
+}

+ 74 - 0
sdk-core/src/main/java/io/milvus/v2/service/cdc/request/UpdateReplicateConfigurationReq.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.cdc.request;
+
+import java.util.Objects;
+
+public class UpdateReplicateConfigurationReq {
+    private ReplicateConfiguration replicateConfiguration;
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    private UpdateReplicateConfigurationReq(Builder builder) {
+        this.replicateConfiguration = builder.replicateConfiguration;
+    }
+
+    public ReplicateConfiguration getReplicateConfiguration() {
+        return replicateConfiguration;
+    }
+
+    public void setReplicateConfiguration(ReplicateConfiguration replicateConfiguration) {
+        this.replicateConfiguration = replicateConfiguration;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        UpdateReplicateConfigurationReq that = (UpdateReplicateConfigurationReq) o;
+        return Objects.equals(replicateConfiguration, that.replicateConfiguration);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(replicateConfiguration);
+    }
+
+    @Override
+    public String toString() {
+        return "UpdateReplicateConfigurationReq{" +
+                "replicateConfiguration=" + replicateConfiguration +
+                '}';
+    }
+
+    public static class Builder {
+        private ReplicateConfiguration replicateConfiguration;
+
+        public Builder replicateConfiguration(ReplicateConfiguration replicateConfiguration) {
+            this.replicateConfiguration = replicateConfiguration;
+            return this;
+        }
+
+        public UpdateReplicateConfigurationReq build() {
+            return new UpdateReplicateConfigurationReq(this);
+        }
+    }
+}

+ 36 - 0
sdk-core/src/main/java/io/milvus/v2/service/cdc/response/UpdateReplicateConfigurationResp.java

@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.v2.service.cdc.response;
+
+public class UpdateReplicateConfigurationResp {
+
+    private UpdateReplicateConfigurationResp(Builder builder) {
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        public UpdateReplicateConfigurationResp build() {
+            return new UpdateReplicateConfigurationResp(this);
+        }
+    }
+}