Browse Source

multi server failover (#272)

Signed-off-by: Gao <linhgao@ebay.com>
linhgao 3 years ago
parent
commit
c07b37beae

+ 45 - 0
docker-compose.yml

@@ -40,10 +40,55 @@ services:
       - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
     ports:
       - "19530:19530"
+      - "9091:9091"
     depends_on:
       - "etcd"
       - "minio"
 
+  etcdslave:
+    container_name: milvus-javasdk-test-etcd-slave
+    image: quay.io/coreos/etcd:v3.5.0
+    volumes:
+      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd-slave:/etcd
+    command: etcd -listen-peer-urls=http://127.0.0.1:2380 -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 -initial-advertise-peer-urls=http://127.0.0.1:2380 --initial-cluster default=http://127.0.0.1:2380 --data-dir /etcd
+    ports:
+      - "2381:2379"
+      - "2382:2380"
+      - "4002:4001"
+
+  minioslave:
+    container_name: milvus-javasdk-test-minio-slave
+    image: minio/minio:RELEASE.2020-12-03T00-03-10Z
+    ports:
+      - "9001:9000"
+    environment:
+      MINIO_ACCESS_KEY: minioadmin
+      MINIO_SECRET_KEY: minioadmin
+    volumes:
+      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio-slave:/minio_data
+    command: minio server /minio_data
+    healthcheck:
+      test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
+      interval: 30s
+      timeout: 20s
+      retries: 3
+
+  standaloneslave:
+    container_name: milvus-javasdk-test-slave-standalone
+    image: milvusdb/milvus:v2.0.0
+    command: ["milvus", "run", "standalone"]
+    environment:
+      ETCD_ENDPOINTS: etcdslave:2379
+      MINIO_ADDRESS: minioslave:9000
+    volumes:
+      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus-slave:/var/lib/milvus
+    ports:
+      - "19531:19530"
+      - "9092:9091"
+    depends_on:
+      - "etcdslave"
+      - "minioslave"
+
 networks:
   default:
     name: test-milvus

+ 6 - 1
pom.xml

@@ -25,7 +25,7 @@
 
     <groupId>io.milvus</groupId>
     <artifactId>milvus-sdk-java</artifactId>
-    <version>2.0.4</version>
+    <version>2.0.5</version>
     <packaging>jar</packaging>
 
     <name>io.milvus:milvus-sdk-java</name>
@@ -165,6 +165,11 @@
             <version>1.18.22</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.14.9</version>
+        </dependency>
     </dependencies>
 
     <profiles>

+ 372 - 0
src/main/java/io/milvus/client/MilvusMultiServiceClient.java

@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.milvus.connection.ClusterFactory;
+import io.milvus.connection.ServerSetting;
+import io.milvus.grpc.*;
+import io.milvus.param.*;
+import io.milvus.param.alias.AlterAliasParam;
+import io.milvus.param.alias.CreateAliasParam;
+import io.milvus.param.alias.DropAliasParam;
+import io.milvus.param.collection.*;
+import io.milvus.param.control.*;
+import io.milvus.param.dml.*;
+import io.milvus.param.index.*;
+import io.milvus.param.partition.*;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class MilvusMultiServiceClient implements MilvusClient {
+
+    private final ClusterFactory clusterFactory;
+
+    /**
+     * Sets connect param for multi milvus clusters.
+     * @param multiConnectParam multi server connect param
+     */
+    public MilvusMultiServiceClient(@NonNull MultiConnectParam multiConnectParam) {
+
+        List<ServerSetting> serverSettings = multiConnectParam.getHosts().stream()
+                .map(host -> {
+
+                    MilvusClient milvusClient = buildMilvusClient(host, multiConnectParam.getConnectTimeoutMs(),
+                            multiConnectParam.getKeepAliveTimeMs(), multiConnectParam.getKeepAliveTimeoutMs(),
+                            multiConnectParam.isKeepAliveWithoutCalls(), multiConnectParam.getIdleTimeoutMs());
+
+                    return ServerSetting.newBuilder()
+                            .withHost(host)
+                            .withMilvusClient(milvusClient).build();
+
+                }).collect(Collectors.toList());
+
+        boolean keepMonitor = serverSettings.size() > 1;
+
+        this.clusterFactory = ClusterFactory.newBuilder()
+                .withServerSetting(serverSettings)
+                .keepMonitor(keepMonitor)
+                .withQueryNodeSingleSearch(multiConnectParam.getQueryNodeSingleSearch())
+                .build();
+    }
+
+    private MilvusClient buildMilvusClient(ServerAddress host, long connectTimeoutMsm, long keepAliveTimeMs,
+                                           long keepAliveTimeoutMs, boolean keepAliveWithoutCalls, long idleTimeoutMs) {
+        ConnectParam clusterConnectParam = ConnectParam.newBuilder()
+                .withHost(host.getHost())
+                .withPort(host.getPort())
+                .withConnectTimeout(connectTimeoutMsm, TimeUnit.MILLISECONDS)
+                .withKeepAliveTime(keepAliveTimeMs, TimeUnit.MILLISECONDS)
+                .withKeepAliveTimeout(keepAliveTimeoutMs, TimeUnit.MILLISECONDS)
+                .keepAliveWithoutCalls(keepAliveWithoutCalls)
+                .withIdleTimeout(idleTimeoutMs, TimeUnit.MILLISECONDS)
+                .build();
+        return new MilvusServiceClient(clusterConnectParam);
+    }
+
+
+    @Override
+    public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
+        return clusterFactory.getMaster().getClient().withTimeout(timeout, timeoutUnit);
+    }
+
+    @Override
+    public void close(long maxWaitSeconds) throws InterruptedException {
+        this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .forEach(serverSetting -> serverSetting.getClient().close());
+        this.clusterFactory.close();
+    }
+
+    @Override
+    public R<Boolean> hasCollection(HasCollectionParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().hasCollection(requestParam);
+    }
+
+    @Override
+    public R<RpcStatus> createCollection(CreateCollectionParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().createCollection(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> dropCollection(DropCollectionParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().dropCollection(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> loadCollection(LoadCollectionParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().loadCollection(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> releaseCollection(ReleaseCollectionParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().releaseCollection(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<DescribeCollectionResponse> describeCollection(DescribeCollectionParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().describeCollection(requestParam);
+    }
+
+    @Override
+    public R<GetCollectionStatisticsResponse> getCollectionStatistics(GetCollectionStatisticsParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getCollectionStatistics(requestParam);
+    }
+
+    @Override
+    public R<ShowCollectionsResponse> showCollections(ShowCollectionsParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().showCollections(requestParam);
+    }
+
+    @Override
+    public R<FlushResponse> flush(FlushParam requestParam) {
+        List<R<FlushResponse>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().flush(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> createPartition(CreatePartitionParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().createPartition(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> dropPartition(DropPartitionParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().dropPartition(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<Boolean> hasPartition(HasPartitionParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().hasPartition(requestParam);
+    }
+
+    @Override
+    public R<RpcStatus> loadPartitions(LoadPartitionsParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().loadPartitions(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> releasePartitions(ReleasePartitionsParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().releasePartitions(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<GetPartitionStatisticsResponse> getPartitionStatistics(GetPartitionStatisticsParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getPartitionStatistics(requestParam);
+    }
+
+    @Override
+    public R<ShowPartitionsResponse> showPartitions(ShowPartitionsParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().showPartitions(requestParam);
+    }
+
+    @Override
+    public R<RpcStatus> createAlias(CreateAliasParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().createAlias(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> dropAlias(DropAliasParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().dropAlias(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> alterAlias(AlterAliasParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().alterAlias(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> createIndex(CreateIndexParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().createIndex(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<RpcStatus> dropIndex(DropIndexParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().dropIndex(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<DescribeIndexResponse> describeIndex(DescribeIndexParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().describeIndex(requestParam);
+    }
+
+    @Override
+    public R<GetIndexStateResponse> getIndexState(GetIndexStateParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getIndexState(requestParam);
+    }
+
+    @Override
+    public R<GetIndexBuildProgressResponse> getIndexBuildProgress(GetIndexBuildProgressParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getIndexBuildProgress(requestParam);
+    }
+
+    @Override
+    public R<MutationResult> insert(InsertParam requestParam) {
+        List<R<MutationResult>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().insert(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public ListenableFuture<R<MutationResult>> insertAsync(InsertParam requestParam) {
+        List<ListenableFuture<R<MutationResult>>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().insertAsync(requestParam))
+                .collect(Collectors.toList());
+        return response.get(0);
+    }
+
+    @Override
+    public R<MutationResult> delete(DeleteParam requestParam) {
+        List<R<MutationResult>> response = this.clusterFactory.getAvailableServerSettings().stream()
+                .map(serverSetting -> serverSetting.getClient().delete(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<SearchResults> search(SearchParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().search(requestParam);
+    }
+
+    @Override
+    public ListenableFuture<R<SearchResults>> searchAsync(SearchParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().searchAsync(requestParam);
+    }
+
+    @Override
+    public R<QueryResults> query(QueryParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().query(requestParam);
+    }
+
+    @Override
+    public ListenableFuture<R<QueryResults>> queryAsync(QueryParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().queryAsync(requestParam);
+    }
+
+    @Override
+    public R<CalcDistanceResults> calcDistance(CalcDistanceParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().calcDistance(requestParam);
+    }
+
+    @Override
+    public R<GetMetricsResponse> getMetrics(GetMetricsParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getMetrics(requestParam);
+    }
+
+    @Override
+    public R<GetFlushStateResponse> getFlushState(GetFlushStateParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getFlushState(requestParam);
+    }
+
+    @Override
+    public R<GetPersistentSegmentInfoResponse> getPersistentSegmentInfo(GetPersistentSegmentInfoParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getPersistentSegmentInfo(requestParam);
+    }
+
+    @Override
+    public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getQuerySegmentInfo(requestParam);
+    }
+
+    @Override
+    public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
+        List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().loadBalance(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public R<GetCompactionStateResponse> getCompactionState(GetCompactionStateParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getCompactionState(requestParam);
+    }
+
+    @Override
+    public R<ManualCompactionResponse> manualCompaction(ManualCompactionParam requestParam) {
+        return null;
+    }
+
+    @Override
+    public R<GetCompactionPlansResponse> getCompactionStateWithPlans(GetCompactionPlansParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().getCompactionStateWithPlans(requestParam);
+    }
+
+    private <T> R<T> handleResponse(List<R<T>> response) {
+        if (CollectionUtils.isNotEmpty(response)) {
+            R<T> rSuccess = null;
+            for (R<T> singleRes : response) {
+                if (R.Status.Success.getCode() == singleRes.getStatus()) {
+                    rSuccess = singleRes;
+                } else {
+                    return singleRes;
+                }
+            }
+            if (null != rSuccess) {
+                return rSuccess;
+            }
+        }
+        return R.failed(R.Status.Unknown, "Response is empty.");
+    }
+}
+

+ 139 - 0
src/main/java/io/milvus/connection/ClusterFactory.java

@@ -0,0 +1,139 @@
+package io.milvus.connection;
+
+import io.milvus.exception.ParamException;
+import io.milvus.param.QueryNodeSingleSearch;
+import io.milvus.param.ServerAddress;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Factory with managing multi cluster.
+ */
+public class ClusterFactory {
+
+    private final List<ServerSetting> serverSettings;
+
+    private ServerSetting master;
+
+    private List<ServerSetting> availableServerSettings;
+
+    private ServerMonitor monitor;
+
+    private ClusterFactory(@NonNull Builder builder) {
+        this.serverSettings = builder.serverSettings;
+        this.master = this.getDefaultServer();
+        this.availableServerSettings = builder.serverSettings;
+        if (builder.keepMonitor) {
+            monitor = new ServerMonitor(this, builder.queryNodeSingleSearch);
+            monitor.start();
+        }
+    }
+
+    public ServerSetting getDefaultServer() {
+        return serverSettings.get(0);
+    }
+
+    public boolean masterIsRunning() {
+        List<ServerAddress> serverAddresses = availableServerSettings.stream()
+                .map(ServerSetting::getServerAddress)
+                .collect(Collectors.toList());
+
+        return serverAddresses.contains(master.getServerAddress());
+    }
+
+    public void masterChange(ServerSetting serverSetting) {
+        this.master = serverSetting;
+    }
+
+    public void availableServerChange(List<ServerSetting> serverSettings) {
+        this.availableServerSettings = serverSettings;
+    }
+
+    public ServerSetting electMaster() {
+        return CollectionUtils.isNotEmpty(availableServerSettings) ? availableServerSettings.get(0) : getDefaultServer();
+    }
+
+    public void close() {
+        if (null != monitor) {
+            monitor.close();
+        }
+    }
+
+    public List<ServerSetting> getServerSettings() {
+        return serverSettings;
+    }
+
+    public ServerSetting getMaster() {
+        return master;
+    }
+
+    public List<ServerSetting> getAvailableServerSettings() {
+        return availableServerSettings;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link ClusterFactory}
+     */
+    public static class Builder {
+        private List<ServerSetting> serverSettings;
+        private boolean keepMonitor = false;
+        private QueryNodeSingleSearch queryNodeSingleSearch;
+
+        private Builder() {
+        }
+
+        /**
+         * Sets server setting list
+         *
+         * @param serverSettings ServerSetting
+         * @return <code>Builder</code>
+         */
+        public Builder withServerSetting(@NonNull List<ServerSetting> serverSettings) {
+            this.serverSettings = serverSettings;
+            return this;
+        }
+
+        /**
+         * Enables the keep-monitor function for server
+         *
+         * @param enable true keep-monitor
+         * @return <code>Builder</code>
+         */
+        public Builder keepMonitor(boolean enable) {
+            this.keepMonitor = enable;
+            return this;
+        }
+
+        /**
+         * Sets single search for query node listener.
+         *
+         * @param queryNodeSingleSearch query node single search for listener
+         * @return <code>Builder</code>
+         */
+        public Builder withQueryNodeSingleSearch(QueryNodeSingleSearch queryNodeSingleSearch) {
+            this.queryNodeSingleSearch = queryNodeSingleSearch;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link ClusterFactory} instance.
+         *
+         * @return {@link ClusterFactory}
+         */
+        public ClusterFactory build() throws ParamException {
+
+            if (CollectionUtils.isEmpty(serverSettings)) {
+                throw new ParamException("Server settings is empty!");
+            }
+
+            return new ClusterFactory(this);
+        }
+    }
+}

+ 73 - 0
src/main/java/io/milvus/connection/ClusterListener.java

@@ -0,0 +1,73 @@
+package io.milvus.connection;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Send heartbeat for a Milvus cluster healthy check.
+ */
+public class ClusterListener implements Listener {
+
+    private static final Logger logger = LoggerFactory.getLogger(ClusterListener.class);
+
+    private static final String HEALTH_PATH = "http://%s:%d/healthz";
+
+    private static final int HTTP_CODE_200 = 200;
+    private static final String RESPONSE_OK = "OK";
+
+    private static final OkHttpClient OK_HTTP_CLIENT = new OkHttpClient.Builder()
+            .connectTimeout(5, TimeUnit.SECONDS)
+            .readTimeout(5, TimeUnit.SECONDS)
+            .writeTimeout(5, TimeUnit.SECONDS)
+            .build();
+
+    @Override
+    public Boolean heartBeat(ServerSetting serverSetting) {
+        String url = String.format(HEALTH_PATH, serverSetting.getServerAddress().getHost(),
+                serverSetting.getServerAddress().getHealthPort());
+
+        boolean isRunning = false;
+        try {
+            Response response = get(url);
+            isRunning = checkResponse(response);
+            if (isRunning) {
+                logger.debug("Host [{}] heartbeat Success of Milvus Cluster Listener.",
+                        serverSetting.getServerAddress().getHost());
+            }
+        } catch (Exception e) {
+            logger.error("Host [{}] heartbeat Error of Milvus Cluster Listener.",
+                    serverSetting.getServerAddress().getHost());
+        }
+        return isRunning;
+    }
+
+    private Boolean checkResponse(Response response) throws IOException {
+        if (HTTP_CODE_200 == response.code()) {
+            assert response.body() != null;
+            String responseBody = response.body().string();
+            return RESPONSE_OK.equalsIgnoreCase(responseBody);
+        }
+        return false;
+    }
+
+    private Response get(String url) throws IOException {
+        if (StringUtils.isEmpty(url)) {
+            throw new IllegalArgumentException("OkHttp GET error: url cannot be null.");
+        }
+
+        Request.Builder requestBuilder = new Request.Builder();
+
+        Request request = requestBuilder
+                .url(url)
+                .get()
+                .build();
+        return OK_HTTP_CLIENT.newCall(request).execute();
+    }
+}

+ 10 - 0
src/main/java/io/milvus/connection/Listener.java

@@ -0,0 +1,10 @@
+package io.milvus.connection;
+
+/**
+ * Interface of multi server listener.
+ */
+public interface Listener {
+
+    Boolean heartBeat(ServerSetting serverSetting);
+
+}

+ 64 - 0
src/main/java/io/milvus/connection/QueryNodeListener.java

@@ -0,0 +1,64 @@
+package io.milvus.connection;
+
+import io.milvus.grpc.SearchResults;
+import io.milvus.param.QueryNodeSingleSearch;
+import io.milvus.param.R;
+import io.milvus.param.dml.SearchParam;
+import io.milvus.response.SearchResultsWrapper;
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Send heartbeat for query nodes healthy check.
+ */
+public class QueryNodeListener implements Listener {
+
+    private static final Logger logger = LoggerFactory.getLogger(QueryNodeListener.class);
+
+    private static final int HEARTBEAT_TIMEOUT_MILLS = 4000;
+
+    private final SearchParam searchParam;
+
+    public QueryNodeListener(QueryNodeSingleSearch singleSearch) {
+        searchParam = SearchParam.newBuilder()
+                .withCollectionName(singleSearch.getCollectionName())
+                .withVectors(singleSearch.getVectors())
+                .withVectorFieldName(singleSearch.getVectorFieldName())
+                .withParams(singleSearch.getParams())
+                .withMetricType(singleSearch.getMetricType())
+                .withTopK(5)
+                .withRoundDecimal(-1)
+                .withGuaranteeTimestamp(1L)
+                .build();
+    }
+
+    @Override
+    public Boolean heartBeat(ServerSetting serverSetting) {
+
+        boolean isRunning = false;
+
+        try {
+            R<SearchResults> response = serverSetting.getClient()
+                    .withTimeout(4, TimeUnit.SECONDS)
+                    .search(searchParam);
+
+            if (response.getStatus() == R.Status.Success.getCode()) {
+                SearchResultsWrapper wrapperSearch = new SearchResultsWrapper(response.getData().getResults());
+                List<SearchResultsWrapper.IDScore> idScores = wrapperSearch.getIDScore(0);
+                if (CollectionUtils.isNotEmpty(idScores)) {
+                    logger.debug("Host [{}] heartbeat Success of Milvus QueryNode Listener.",
+                            serverSetting.getServerAddress().getHost());
+                    isRunning = true;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Host [{}] heartbeat Error of Milvus QueryNode Listener.",
+                    serverSetting.getServerAddress().getHost(), e);
+        }
+        return isRunning;
+    }
+}

+ 101 - 0
src/main/java/io/milvus/connection/ServerMonitor.java

@@ -0,0 +1,101 @@
+package io.milvus.connection;
+
+import io.milvus.param.QueryNodeSingleSearch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Monitor with scheduling to check server healthy state.
+ */
+public class ServerMonitor {
+
+    private static final Logger logger = LoggerFactory.getLogger(ServerMonitor.class);
+
+    private static final long heartbeatInterval = 10 * 1000;
+
+    private Long lastHeartbeat;
+
+    private final List<Listener> listeners;
+
+    private final ClusterFactory clusterFactory;
+
+    private final Thread monitorThread;
+    private volatile boolean isRunning;
+
+    public ServerMonitor(ClusterFactory clusterFactory, QueryNodeSingleSearch queryNodeSingleSearch) {
+        if (null != queryNodeSingleSearch) {
+            this.listeners = Arrays.asList(new ClusterListener(), new QueryNodeListener(queryNodeSingleSearch));
+        } else {
+            this.listeners = Collections.singletonList(new ClusterListener());
+        }
+        this.clusterFactory = clusterFactory;
+
+        ServerMonitorRunnable monitor = new ServerMonitorRunnable();
+        this.monitorThread = new Thread(monitor, "Milvus-server-monitor");
+        this.monitorThread.setDaemon(true);
+        this.isRunning = true;
+    }
+
+    public void start() {
+        logger.info("Milvus Server Monitor start.");
+        monitorThread.start();
+    }
+
+    public void close() {
+        isRunning = false;
+        logger.info("Milvus Server Monitor close.");
+        monitorThread.interrupt();
+    }
+
+    private class ServerMonitorRunnable implements Runnable {
+        public void run() {
+            while (isRunning) {
+                long startTime = System.currentTimeMillis();
+
+                if (null == lastHeartbeat || startTime - lastHeartbeat > heartbeatInterval) {
+
+                    lastHeartbeat = startTime;
+
+                    try {
+                        List<ServerSetting> availableServer = getAvailableServer();
+                        clusterFactory.availableServerChange(availableServer);
+                    } catch (Exception e) {
+                        logger.error("Milvus Server Heartbeat error, monitor will stop.", e);
+                    }
+
+                    if (!clusterFactory.masterIsRunning()) {
+                        ServerSetting master = clusterFactory.electMaster();
+
+                        logger.warn("Milvus Server Heartbeat. Master is Not Running, Re-Elect [{}] to master.",
+                                master.getServerAddress().getHost());
+
+                        clusterFactory.masterChange(master);
+                    } else {
+                        logger.debug("Milvus Server Heartbeat. Master is Running.");
+                    }
+                }
+            }
+
+        }
+
+        private List<ServerSetting> getAvailableServer() {
+            return clusterFactory.getServerSettings().stream()
+                    .filter(this::checkServerState).collect(Collectors.toList());
+        }
+
+        private boolean checkServerState(ServerSetting serverSetting) {
+            for (Listener listener : listeners) {
+                boolean isRunning = listener.heartBeat(serverSetting);
+                if (!isRunning) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+}

+ 83 - 0
src/main/java/io/milvus/connection/ServerSetting.java

@@ -0,0 +1,83 @@
+package io.milvus.connection;
+
+import io.milvus.client.MilvusClient;
+import io.milvus.exception.ParamException;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.ParamUtils;
+import io.milvus.param.ServerAddress;
+import lombok.NonNull;
+
+/**
+ * Defined address and Milvus clients for each server.
+ */
+public class ServerSetting {
+    private final ServerAddress serverAddress;
+    private final MilvusClient client;
+
+    public ServerSetting(@NonNull Builder builder) {
+        this.serverAddress = builder.serverAddress;
+        this.client = builder.milvusClient;
+    }
+
+    public ServerAddress getServerAddress() {
+        return serverAddress;
+    }
+
+    public MilvusClient getClient() {
+        return client;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private ServerAddress serverAddress;
+        private MilvusClient milvusClient;
+
+        private Builder() {
+        }
+
+
+        /**
+         * Sets the server address
+         *
+         * @param serverAddress ServerAddress host,port/server
+         * @return <code>Builder</code>
+         */
+        public Builder withHost(@NonNull ServerAddress serverAddress) {
+            this.serverAddress = serverAddress;
+            return this;
+        }
+
+        /**
+         * Sets the server client for a cluster
+         *
+         * @param milvusClient MilvusClient
+         * @return <code>Builder</code>
+         */
+        public Builder withMilvusClient(MilvusClient milvusClient) {
+            this.milvusClient = milvusClient;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link ConnectParam} instance.
+         *
+         * @return {@link ConnectParam}
+         */
+        public ServerSetting build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(serverAddress.getHost(), "Host name");
+
+            if (serverAddress.getPort() < 0 || serverAddress.getPort() > 0xFFFF) {
+                throw new ParamException("Port is out of range!");
+            }
+
+            if (milvusClient == null) {
+                throw new ParamException("Milvus client can not be empty");
+            }
+
+            return new ServerSetting(this);
+        }
+    }
+}

+ 210 - 0
src/main/java/io/milvus/param/MultiConnectParam.java

@@ -0,0 +1,210 @@
+package io.milvus.param;
+
+import io.milvus.exception.ParamException;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Parameters for client connection of multi server.
+ */
+public class MultiConnectParam {
+    private final List<ServerAddress> hosts;
+    private final QueryNodeSingleSearch queryNodeSingleSearch;
+    private final long connectTimeoutMs;
+    private final long keepAliveTimeMs;
+    private final long keepAliveTimeoutMs;
+    private final boolean keepAliveWithoutCalls;
+    private final long idleTimeoutMs;
+
+    private MultiConnectParam(@NonNull Builder builder) {
+        this.hosts = builder.hosts;
+        this.queryNodeSingleSearch = builder.queryNodeSingleSearch;
+        this.connectTimeoutMs = builder.connectTimeoutMs;
+        this.keepAliveTimeMs = builder.keepAliveTimeMs;
+        this.keepAliveTimeoutMs = builder.keepAliveTimeoutMs;
+        this.keepAliveWithoutCalls = builder.keepAliveWithoutCalls;
+        this.idleTimeoutMs = builder.idleTimeoutMs;
+    }
+
+    public List<ServerAddress> getHosts() {
+        return hosts;
+    }
+
+    public QueryNodeSingleSearch getQueryNodeSingleSearch() {
+        return queryNodeSingleSearch;
+    }
+
+    public long getConnectTimeoutMs() {
+        return connectTimeoutMs;
+    }
+
+    public long getKeepAliveTimeMs() {
+        return keepAliveTimeMs;
+    }
+
+    public long getKeepAliveTimeoutMs() {
+        return keepAliveTimeoutMs;
+    }
+
+    public boolean isKeepAliveWithoutCalls() {
+        return keepAliveWithoutCalls;
+    }
+
+    public long getIdleTimeoutMs() {
+        return idleTimeoutMs;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link MultiConnectParam}
+     */
+    public static class Builder {
+        private List<ServerAddress> hosts;
+        private QueryNodeSingleSearch queryNodeSingleSearch;
+        private long connectTimeoutMs = 10000;
+        private long keepAliveTimeMs = Long.MAX_VALUE; // Disabling keep alive
+        private long keepAliveTimeoutMs = 20000;
+        private boolean keepAliveWithoutCalls = false;
+        private long idleTimeoutMs = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the addresses.
+         *
+         * @param hosts hosts serverAddresses
+         * @return <code>Builder</code>
+         */
+        public Builder withHosts(@NonNull List<ServerAddress> hosts) {
+            this.hosts = hosts;
+            return this;
+        }
+
+        /**
+         * Sets single search for query node listener.
+         *
+         * @param queryNodeSingleSearch query node single search for listener
+         * @return <code>Builder</code>
+         */
+        public Builder withQueryNodeSingleSearch(@NonNull QueryNodeSingleSearch queryNodeSingleSearch) {
+            this.queryNodeSingleSearch = queryNodeSingleSearch;
+            return this;
+        }
+
+        /**
+         * Sets the connection timeout value of client channel. The timeout value must be greater than zero.
+         *
+         * @param connectTimeout timeout value
+         * @param timeUnit timeout unit
+         * @return <code>Builder</code>
+         */
+        public Builder withConnectTimeout(long connectTimeout, @NonNull TimeUnit timeUnit) {
+            this.connectTimeoutMs = timeUnit.toMillis(connectTimeout);
+            return this;
+        }
+
+        /**
+         * Sets the keep-alive time value of client channel. The keep-alive value must be greater than zero.
+         *
+         * @param keepAliveTime keep-alive value
+         * @param timeUnit keep-alive unit
+         * @return <code>Builder</code>
+         */
+        public Builder withKeepAliveTime(long keepAliveTime, @NonNull TimeUnit timeUnit) {
+            this.keepAliveTimeMs = timeUnit.toMillis(keepAliveTime);
+            return this;
+        }
+
+        /**
+         * Sets the keep-alive timeout value of client channel. The timeout value must be greater than zero.
+         *
+         * @param keepAliveTimeout timeout value
+         * @param timeUnit timeout unit
+         * @return <code>Builder</code>
+         */
+        public Builder withKeepAliveTimeout(long keepAliveTimeout, @NonNull TimeUnit timeUnit) {
+            this.keepAliveTimeoutMs = timeUnit.toNanos(keepAliveTimeout);
+            return this;
+        }
+
+        /**
+         * Enables the keep-alive function for client channel.
+         *
+         * @param enable true keep-alive
+         * @return <code>Builder</code>
+         */
+        public Builder keepAliveWithoutCalls(boolean enable) {
+            keepAliveWithoutCalls = enable;
+            return this;
+        }
+
+        /**
+         * Sets the idle timeout value of client channel. The timeout value must be larger than zero.
+         *
+         * @param idleTimeout timeout value
+         * @param timeUnit timeout unit
+         * @return <code>Builder</code>
+         */
+        public Builder withIdleTimeout(long idleTimeout, @NonNull TimeUnit timeUnit) {
+            this.idleTimeoutMs = timeUnit.toMillis(idleTimeout);
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link MultiConnectParam} instance.
+         *
+         * @return {@link MultiConnectParam}
+         */
+        public MultiConnectParam build() throws ParamException {
+            if (CollectionUtils.isEmpty(hosts)) {
+                throw new ParamException("Server addresses is empty!");
+            }
+
+            for (ServerAddress serverAddress : hosts) {
+                ParamUtils.CheckNullEmptyString(serverAddress.getHost(), "Host name");
+
+                if (serverAddress.getPort() < 0 || serverAddress.getPort() > 0xFFFF) {
+                    throw new ParamException("Port is out of range!");
+                }
+            }
+
+            if (keepAliveTimeMs <= 0L) {
+                throw new ParamException("Keep alive time must be positive!");
+            }
+
+            if (connectTimeoutMs <= 0L) {
+                throw new ParamException("Connect timeout must be positive!");
+            }
+
+            if (keepAliveTimeoutMs <= 0L) {
+                throw new ParamException("Keep alive timeout must be positive!");
+            }
+
+            if (idleTimeoutMs <= 0L) {
+                throw new ParamException("Idle timeout must be positive!");
+            }
+
+            return new MultiConnectParam(this);
+        }
+    }
+
+    /**
+     * Constructs a <code>String</code> by {@link ConnectParam} instance.
+     *
+     * @return <code>String</code>
+     */
+    @Override
+    public String toString() {
+        final StringBuffer sb = new StringBuffer("MultiConnectParam{");
+        sb.append("hosts=").append(hosts);
+        sb.append('}');
+        return sb.toString();
+    }
+}

+ 173 - 0
src/main/java/io/milvus/param/QueryNodeSingleSearch.java

@@ -0,0 +1,173 @@
+package io.milvus.param;
+
+import io.milvus.exception.ParamException;
+import lombok.NonNull;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Defined single search for query node listener send heartbeat.
+ */
+public class QueryNodeSingleSearch {
+
+    private final String collectionName;
+    private final MetricType metricType;
+    private final String vectorFieldName;
+    private final List<?> vectors;
+    private final String params;
+
+    private QueryNodeSingleSearch(@NonNull Builder builder) {
+        this.collectionName = builder.collectionName;
+        this.metricType = builder.metricType;
+        this.vectorFieldName = builder.vectorFieldName;
+        this.vectors = builder.vectors;
+        this.params = builder.params;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public String getCollectionName() {
+        return collectionName;
+    }
+
+    public MetricType getMetricType() {
+        return metricType;
+    }
+
+    public String getVectorFieldName() {
+        return vectorFieldName;
+    }
+
+    public List<?> getVectors() {
+        return vectors;
+    }
+
+    public String getParams() {
+        return params;
+    }
+
+    /**
+     * Builder for {@link QueryNodeSingleSearch}
+     */
+    public static class Builder {
+        private String collectionName;
+        private MetricType metricType = MetricType.L2;
+        private String vectorFieldName;
+        private List<?> vectors;
+        private String params = "{}";
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the collection name. Collection name cannot be empty or null.
+         *
+         * @param collectionName collection name
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionName(@NonNull String collectionName) {
+            this.collectionName = collectionName;
+            return this;
+        }
+
+        /**
+         * Sets metric type of ANN searching.
+         *
+         * @param metricType metric type
+         * @return <code>Builder</code>
+         */
+        public Builder withMetricType(@NonNull MetricType metricType) {
+            this.metricType = metricType;
+            return this;
+        }
+
+        /**
+         * Sets target vector field by name. Field name cannot be empty or null.
+         *
+         * @param vectorFieldName vector field name
+         * @return <code>Builder</code>
+         */
+        public Builder withVectorFieldName(@NonNull String vectorFieldName) {
+            this.vectorFieldName = vectorFieldName;
+            return this;
+        }
+
+        /**
+         * Sets the target vectors.
+         *
+         * @param vectors list of target vectors:
+         *                if vector type is FloatVector, vectors is List&lt;List&lt;Float&gt;&gt;;
+         *                if vector type is BinaryVector, vectors is List&lt;ByteBuffer&gt;;
+         * @return <code>Builder</code>
+         */
+        public Builder withVectors(@NonNull List<?> vectors) {
+            this.vectors = vectors;
+            return this;
+        }
+
+        /**
+         * Sets the search parameters specific to the index type.
+         *
+         * For example: IVF index, the search parameters can be "{\"nprobe\":10}"
+         * For more information: @see <a href="https://milvus.io/docs/v2.0.0/index_selection.md">Index Selection</a>
+         *
+         * @param params extra parameters in json format
+         * @return <code>Builder</code>
+         */
+        public Builder withParams(@NonNull String params) {
+            this.params = params;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link QueryNodeSingleSearch} instance.
+         *
+         * @return {@link QueryNodeSingleSearch}
+         */
+        public QueryNodeSingleSearch build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(collectionName, "Collection name");
+            ParamUtils.CheckNullEmptyString(vectorFieldName, "Target field name");
+
+            if (metricType == MetricType.INVALID) {
+                throw new ParamException("Metric type is illegal");
+            }
+
+            if (vectors == null || vectors.isEmpty()) {
+                throw new ParamException("Target vectors can not be empty");
+            }
+
+            if (vectors.get(0) instanceof List) {
+                // float vectors
+                List<?> first = (List<?>) vectors.get(0);
+                if (!(first.get(0) instanceof Float)) {
+                    throw new ParamException("Float vector field's value must be List<Float>");
+                }
+
+                int dim = first.size();
+                for (int i = 1; i < vectors.size(); ++i) {
+                    List<?> temp = (List<?>) vectors.get(i);
+                    if (dim != temp.size()) {
+                        throw new ParamException("Target vector dimension must be equal");
+                    }
+                }
+            } else if (vectors.get(0) instanceof ByteBuffer) {
+                // binary vectors
+                ByteBuffer first = (ByteBuffer) vectors.get(0);
+                int dim = first.position();
+                for (int i = 1; i < vectors.size(); ++i) {
+                    ByteBuffer temp = (ByteBuffer) vectors.get(i);
+                    if (dim != temp.position()) {
+                        throw new ParamException("Target vector dimension must be equal");
+                    }
+                }
+            } else {
+                throw new ParamException("Target vector type must be List<Float> or ByteBuffer");
+            }
+
+            return new QueryNodeSingleSearch(this);
+        }
+    }
+}

+ 126 - 0
src/main/java/io/milvus/param/ServerAddress.java

@@ -0,0 +1,126 @@
+package io.milvus.param;
+
+import io.milvus.exception.ParamException;
+import lombok.NonNull;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+public class ServerAddress {
+    private final String host;
+    private final int port;
+    private final int healthPort;
+
+    private ServerAddress(@NonNull Builder builder) {
+        this.host = builder.host;
+        this.port = builder.port;
+        this.healthPort = builder.healthPort;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public int getHealthPort() {
+        return healthPort;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String host = "localhost";
+        private int port = 19530;
+        private int healthPort = 9091;
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the host name/address.
+         *
+         * @param host host name/address
+         * @return <code>Builder</code>
+         */
+        public Builder withHost(@NonNull String host) {
+            this.host = host;
+            return this;
+        }
+
+        /**
+         * Sets the connection port. Port value must be greater than zero and less than 65536.
+         *
+         * @param port port value
+         * @return <code>Builder</code>
+         */
+        public Builder withPort(int port)  {
+            this.port = port;
+            return this;
+        }
+
+        /**
+         * Sets the cluster health port. Port value must be greater than zero and less than 65536.
+         *
+         * @param port port value
+         * @return <code>Builder</code>
+         */
+        public Builder withHealthPort(int port)  {
+            this.healthPort = port;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link ServerAddress} instance.
+         *
+         * @return {@link ServerAddress}
+         */
+        public ServerAddress build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(host, "Host name");
+
+            if (port < 0 || port > 0xFFFF) {
+                throw new ParamException("Port is out of range!");
+            }
+
+            if (healthPort < 0 || healthPort > 0xFFFF) {
+                throw new ParamException("Health Port is out of range!");
+            }
+
+            return new ServerAddress(this);
+        }
+    }
+
+    @Override
+    public String toString() {
+        final StringBuffer sb = new StringBuffer("ServerAddress{");
+        sb.append("host='").append(host).append('\'');
+        sb.append(", port=").append(port);
+        sb.append('}');
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ServerAddress that = (ServerAddress) o;
+
+        return new EqualsBuilder()
+                .append(port, that.port)
+                .append(host, that.host)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .append(host)
+                .append(port)
+                .toHashCode();
+    }
+}

+ 727 - 0
src/test/java/io/milvus/client/MilvusMultiClientDockerTest.java

@@ -0,0 +1,727 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.milvus.grpc.*;
+import io.milvus.param.*;
+import io.milvus.param.collection.*;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.QueryParam;
+import io.milvus.param.dml.SearchParam;
+import io.milvus.param.index.CreateIndexParam;
+import io.milvus.param.index.DescribeIndexParam;
+import io.milvus.param.partition.GetPartitionStatisticsParam;
+import io.milvus.param.partition.ShowPartitionsParam;
+import io.milvus.response.*;
+import org.apache.commons.text.RandomStringGenerator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.codehaus.plexus.util.FileUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+class MilvusMultiClientDockerTest {
+    private static final Logger logger = LogManager.getLogger("MilvusMultiClientDockerTest");
+    private static MilvusClient client;
+    private static RandomStringGenerator generator;
+    private static final int dimension = 128;
+    private static final Boolean useDockerCompose = Boolean.TRUE;
+
+    private static void startDockerContainer() {
+        if (!useDockerCompose) {
+            return;
+        }
+
+        // start the test container
+        Runtime runtime = Runtime.getRuntime();
+        String bashCommand = "docker-compose up -d";
+
+        try {
+            logger.debug(bashCommand);
+            Process pro = runtime.exec(bashCommand);
+            int status = pro.waitFor();
+            if (status != 0) {
+                logger.error("Failed to start docker compose, status " + status);
+            }
+
+            // here stop 10 seconds, reason: although milvus container is alive, it is still in initializing,
+            // connection will failed and get error "proxy not health".
+            TimeUnit.SECONDS.sleep(10);
+            logger.debug("Milvus service started");
+        } catch (Throwable t) {
+            logger.error("Failed to execute docker compose up", t);
+        }
+    }
+
+    private static void stopDockerContainer() {
+        if (!useDockerCompose) {
+            return;
+        }
+
+        // stop all test dockers
+        Runtime runtime = Runtime.getRuntime();
+        String bashCommand = "docker-compose down";
+
+        try {
+            logger.debug("Milvus service stopping...");
+            TimeUnit.SECONDS.sleep(5);
+            logger.debug(bashCommand);
+            Process pro = runtime.exec(bashCommand);
+            int status = pro.waitFor();
+            if (status != 0) {
+                logger.error("Failed to stop test docker containers" + pro.getOutputStream().toString());
+            }
+        } catch (Throwable t) {
+            logger.error("Failed to execute docker compose down", t);
+        }
+
+        // clean up log dir
+        runtime = Runtime.getRuntime();
+        bashCommand = "docker-compose rm";
+
+        try {
+            logger.debug(bashCommand);
+            Process pro = runtime.exec(bashCommand);
+            int status = pro.waitFor();
+            if (status != 0) {
+                logger.error("Failed to clean up test docker containers" + pro.getOutputStream().toString());
+            }
+
+            logger.error("Clean up volume directory of Docker");
+            FileUtils.deleteDirectory("volumes");
+        } catch (Throwable t) {
+            logger.error("Failed to remove docker compose volume", t);
+        }
+    }
+
+    @BeforeAll
+    public static void setUp() throws InterruptedException {
+        startDockerContainer();
+
+        MultiConnectParam connectParam = multiConnectParamBuilder().build();
+        client = new MilvusMultiServiceClient(connectParam);
+//        TimeUnit.SECONDS.sleep(10);
+        generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build();
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        if (client != null) {
+            client.close();
+        }
+
+        stopDockerContainer();
+    }
+
+    private static MultiConnectParam.Builder multiConnectParamBuilder() {
+        ServerAddress serverAddress = ServerAddress.newBuilder().withHost("localhost").withPort(19530).build();
+        ServerAddress serverSlaveAddress = ServerAddress.newBuilder().withHost("localhost").withPort(19531).withHealthPort(9092).build();
+        return MultiConnectParam.newBuilder().withHosts(Arrays.asList(serverAddress, serverSlaveAddress));
+    }
+
+    private List<List<Float>> generateFloatVectors(int count) {
+        Random ran = new Random();
+        List<List<Float>> vectors = new ArrayList<>();
+        for (int n = 0; n < count; ++n) {
+            List<Float> vector = new ArrayList<>();
+            for (int i = 0; i < dimension; ++i) {
+                vector.add(ran.nextFloat());
+            }
+            vectors.add(vector);
+        }
+
+        return vectors;
+    }
+
+    private List<List<Float>> normalizeFloatVectors(List<List<Float>> src) {
+        for (List<Float> vector : src) {
+            double total = 0.0;
+            for (Float val : vector) {
+                total = total + val * val;
+            }
+            float squre = (float) Math.sqrt(total);
+            for (int i = 0; i < vector.size(); ++i) {
+                vector.set(i, vector.get(i) / squre);
+            }
+        }
+
+        return src;
+    }
+
+    private List<ByteBuffer> generateBinaryVectors(int count) {
+        Random ran = new Random();
+        List<ByteBuffer> vectors = new ArrayList<>();
+        int byteCount = dimension / 8;
+        for (int n = 0; n < count; ++n) {
+            ByteBuffer vector = ByteBuffer.allocate(byteCount);
+            for (int i = 0; i < byteCount; ++i) {
+                vector.put((byte) ran.nextInt(Byte.MAX_VALUE));
+            }
+            vectors.add(vector);
+        }
+        return vectors;
+
+    }
+
+    @Test
+    void testFloatVectors() {
+        String randomCollectionName = generator.generate(10);
+
+        // collection schema
+        String field1Name = "long_field";
+        String field2Name = "vec_field";
+        String field3Name = "bool_field";
+        String field4Name = "double_field";
+        String field5Name = "int_field";
+        List<FieldType> fieldsSchema = new ArrayList<>();
+        fieldsSchema.add(FieldType.newBuilder()
+                .withPrimaryKey(true)
+                .withAutoID(false)
+                .withDataType(DataType.Int64)
+                .withName(field1Name)
+                .withDescription("identity")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.FloatVector)
+                .withName(field2Name)
+                .withDescription("face")
+                .withDimension(dimension)
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.Bool)
+                .withName(field3Name)
+                .withDescription("gender")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.Double)
+                .withName(field4Name)
+                .withDescription("weight")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.Int8)
+                .withName(field5Name)
+                .withDescription("age")
+                .build());
+
+        // create collection
+        CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withDescription("test")
+                .withFieldTypes(fieldsSchema)
+                .build();
+
+        R<RpcStatus> createR = client.createCollection(createParam);
+        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        R<DescribeCollectionResponse> response = client.describeCollection(DescribeCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+
+        DescCollResponseWrapper desc = new DescCollResponseWrapper(response.getData());
+        System.out.println(desc.toString());
+
+        // insert data
+        int rowCount = 10000;
+        List<Long> ids = new ArrayList<>();
+        List<Boolean> genders = new ArrayList<>();
+        List<Double> weights = new ArrayList<>();
+        List<Short> ages = new ArrayList<>();
+        for (long i = 0L; i < rowCount; ++i) {
+            ids.add(i);
+            genders.add(i % 3 == 0 ? Boolean.TRUE : Boolean.FALSE);
+            weights.add(((double) (i + 1) / 100));
+            ages.add((short) ((i + 1) % 99));
+        }
+        List<List<Float>> vectors = generateFloatVectors(rowCount);
+
+        List<InsertParam.Field> fieldsInsert = new ArrayList<>();
+        fieldsInsert.add(new InsertParam.Field(field1Name, DataType.Int64, ids));
+        fieldsInsert.add(new InsertParam.Field(field5Name, DataType.Int8, ages));
+        fieldsInsert.add(new InsertParam.Field(field4Name, DataType.Double, weights));
+        fieldsInsert.add(new InsertParam.Field(field3Name, DataType.Bool, genders));
+        fieldsInsert.add(new InsertParam.Field(field2Name, DataType.FloatVector, vectors));
+
+        InsertParam insertParam = InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFields(fieldsInsert)
+                .build();
+
+        R<MutationResult> insertR = client.withTimeout(10, TimeUnit.SECONDS).insert(insertParam);
+        assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+        MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
+        System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
+
+        // get collection statistics
+        R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam
+                .newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFlush(true)
+                .build());
+        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+
+        GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
+        System.out.println("Collection row count: " + stat.getRowCount());
+
+        // get partition statistics
+        R<GetPartitionStatisticsResponse> statPartR = client.getPartitionStatistics(GetPartitionStatisticsParam
+                .newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withPartitionName("_default") // each collection has '_default' partition
+                .withFlush(true)
+                .build());
+        assertEquals(R.Status.Success.getCode(), statPartR.getStatus().intValue());
+
+        GetPartStatResponseWrapper statPart = new GetPartStatResponseWrapper(statPartR.getData());
+        System.out.println("Partition row count: " + statPart.getRowCount());
+
+        // create index
+        CreateIndexParam indexParam = CreateIndexParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFieldName(field2Name)
+                .withIndexType(IndexType.IVF_FLAT)
+                .withMetricType(MetricType.L2)
+                .withExtraParam("{\"nlist\":256}")
+                .withSyncMode(Boolean.TRUE)
+                .withSyncWaitingInterval(500L)
+                .withSyncWaitingTimeout(30L)
+                .build();
+
+        R<RpcStatus> createIndexR = client.createIndex(indexParam);
+        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+
+        // get index description
+        DescribeIndexParam descIndexParam = DescribeIndexParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFieldName(field2Name)
+                .build();
+        R<DescribeIndexResponse> descIndexR = client.describeIndex(descIndexParam);
+        assertEquals(R.Status.Success.getCode(), descIndexR.getStatus().intValue());
+
+        DescIndexResponseWrapper indexDesc = new DescIndexResponseWrapper(descIndexR.getData());
+        System.out.println("Index description: " + indexDesc.toString());
+
+        // load collection
+        R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+
+        // show collections
+        R<ShowCollectionsResponse> showR = client.showCollections(ShowCollectionsParam.newBuilder()
+                .addCollectionName(randomCollectionName)
+                .build());
+        assertEquals(R.Status.Success.getCode(), showR.getStatus().intValue());
+        ShowCollResponseWrapper info = new ShowCollResponseWrapper(showR.getData());
+        System.out.println("Collection info: " + info.toString());
+
+        // show partitions
+        R<ShowPartitionsResponse> showPartR = client.showPartitions(ShowPartitionsParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .addPartitionName("_default") // each collection has a '_default' partition
+                .build());
+        assertEquals(R.Status.Success.getCode(), showPartR.getStatus().intValue());
+        ShowPartResponseWrapper infoPart = new ShowPartResponseWrapper(showPartR.getData());
+        System.out.println("Partition info: " + infoPart.toString());
+
+        // query vectors to verify
+        List<Long> queryIDs = new ArrayList<>();
+        List<Double> compareWeights = new ArrayList<>();
+        int nq = 5;
+        Random ran = new Random();
+        int randomIndex = ran.nextInt(rowCount - nq);
+        for (int i = randomIndex; i < randomIndex + nq; ++i) {
+            queryIDs.add(ids.get(i));
+            compareWeights.add(weights.get(i));
+        }
+        String expr = field1Name + " in " + queryIDs.toString();
+        List<String> outputFields = Arrays.asList(field1Name, field2Name, field3Name, field4Name, field4Name);
+        QueryParam queryParam = QueryParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withExpr(expr)
+                .withOutFields(outputFields)
+                .build();
+
+        R<QueryResults> queryR = client.query(queryParam);
+        assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+
+        // verify query result
+        QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
+        for (String fieldName : outputFields) {
+            FieldDataWrapper wrapper = queryResultsWrapper.getFieldWrapper(fieldName);
+            System.out.println("Query data of " + fieldName + ", row count: " + wrapper.getRowCount());
+            System.out.println(wrapper.getFieldData());
+            assertEquals(nq, wrapper.getFieldData().size());
+
+            if (fieldName.compareTo(field1Name) == 0) {
+                List<?> out = queryResultsWrapper.getFieldWrapper(field1Name).getFieldData();
+                assertEquals(nq, out.size());
+                for (Object o : out) {
+                    long id = (Long) o;
+                    assertTrue(queryIDs.contains(id));
+                }
+            }
+        }
+
+        // Note: the query() return vectors are not in same sequence to the input
+        // here we cannot compare vector one by one
+        // the boolean also cannot be compared
+        if (outputFields.contains(field2Name)) {
+            assertTrue(queryResultsWrapper.getFieldWrapper(field2Name).isVectorField());
+            List<?> out = queryResultsWrapper.getFieldWrapper(field2Name).getFieldData();
+            assertEquals(nq, out.size());
+        }
+
+        if (outputFields.contains(field3Name)) {
+            List<?> out = queryResultsWrapper.getFieldWrapper(field3Name).getFieldData();
+            assertEquals(nq, out.size());
+        }
+
+        if (outputFields.contains(field4Name)) {
+            List<?> out = queryResultsWrapper.getFieldWrapper(field4Name).getFieldData();
+            assertEquals(nq, out.size());
+            for (Object o : out) {
+                double d = (Double) o;
+                assertTrue(compareWeights.contains(d));
+            }
+        }
+
+
+        // pick some vectors to search
+        List<Long> targetVectorIDs = new ArrayList<>();
+        List<List<Float>> targetVectors = new ArrayList<>();
+        for (int i = randomIndex; i < randomIndex + nq; ++i) {
+            targetVectorIDs.add(ids.get(i));
+            targetVectors.add(vectors.get(i));
+        }
+
+        int topK = 5;
+        SearchParam searchParam = SearchParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withMetricType(MetricType.L2)
+                .withTopK(topK)
+                .withVectors(targetVectors)
+                .withVectorFieldName(field2Name)
+                .withParams("{\"nprobe\":8}")
+                .addOutField(field4Name)
+                .build();
+
+        R<SearchResults> searchR = client.search(searchParam);
+//        System.out.println(searchR);
+        assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+
+        // verify the search result
+        SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
+        for (int i = 0; i < targetVectors.size(); ++i) {
+            List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
+            System.out.println("The result of No." + i + " target vector(ID = " + targetVectorIDs.get(i) + "):");
+            System.out.println(scores);
+            assertEquals(targetVectorIDs.get(i).longValue(), scores.get(0).getLongID());
+        }
+
+        List<?> fieldData = results.getFieldData(field4Name, 0);
+        assertEquals(topK, fieldData.size());
+        fieldData = results.getFieldData(field4Name, nq - 1);
+        assertEquals(topK, fieldData.size());
+
+        // drop collection
+        DropCollectionParam dropParam = DropCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build();
+
+        R<RpcStatus> dropR = client.dropCollection(dropParam);
+        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+    }
+
+    @Test
+    void testBinaryVectors() {
+        String randomCollectionName = generator.generate(10);
+
+        // collection schema
+        String field1Name = "field1";
+        String field2Name = "field2";
+        FieldType field1 = FieldType.newBuilder()
+                .withPrimaryKey(true)
+                .withDataType(DataType.Int64)
+                .withName(field1Name)
+                .withDescription("hello")
+                .build();
+
+        FieldType field2 = FieldType.newBuilder()
+                .withDataType(DataType.BinaryVector)
+                .withName(field2Name)
+                .withDescription("world")
+                .withDimension(dimension)
+                .build();
+
+        // create collection
+        CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withDescription("test")
+                .addFieldType(field1)
+                .addFieldType(field2)
+                .build();
+
+        R<RpcStatus> createR = client.createCollection(createParam);
+        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        // insert data
+        int rowCount = 10000;
+        List<Long> ids = new ArrayList<>();
+        for (long i = 0L; i < rowCount; ++i) {
+            ids.add(i);
+        }
+        List<ByteBuffer> vectors = generateBinaryVectors(rowCount);
+
+        List<InsertParam.Field> fields = new ArrayList<>();
+        // no need to provide id here since this field is auto_id
+        fields.add(new InsertParam.Field(field1Name, DataType.Int64, ids));
+        fields.add(new InsertParam.Field(field2Name, DataType.BinaryVector, vectors));
+
+        InsertParam insertParam = InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFields(fields)
+                .build();
+
+        R<MutationResult> insertR = client.insert(insertParam);
+        assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
+        System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
+
+        // get collection statistics
+        R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam
+                .newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFlush(true)
+                .build());
+        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+
+        GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
+        System.out.println("Collection row count: " + stat.getRowCount());
+
+        // load collection
+        R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+
+        // pick some vectors to search
+        int nq = 5;
+        List<Long> targetVectorIDs = new ArrayList<>();
+        List<ByteBuffer> targetVectors = new ArrayList<>();
+        Random ran = new Random();
+        int randomIndex = ran.nextInt(rowCount - nq);
+        for (int i = randomIndex; i < randomIndex + nq; ++i) {
+            targetVectorIDs.add(ids.get(i));
+            targetVectors.add(vectors.get(i));
+        }
+
+        int topK = 5;
+        SearchParam searchParam = SearchParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withMetricType(MetricType.HAMMING)
+                .withTopK(topK)
+                .withVectors(targetVectors)
+                .withVectorFieldName(field2Name)
+                .build();
+
+        R<SearchResults> searchR = client.search(searchParam);
+//        System.out.println(searchR);
+        assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+
+        // verify the search result
+        SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
+        for (int i = 0; i < targetVectors.size(); ++i) {
+            List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
+            System.out.println("The result of No." + i + " target vector(ID = " + targetVectorIDs.get(i) + "):");
+            System.out.println(scores);
+            assertEquals(targetVectorIDs.get(i).longValue(), scores.get(0).getLongID());
+        }
+
+        // drop collection
+        DropCollectionParam dropParam = DropCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build();
+
+        R<RpcStatus> dropR = client.dropCollection(dropParam);
+        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+    }
+
+    @Test
+    void testAsyncMethods() {
+        String randomCollectionName = generator.generate(10);
+
+        // collection schema
+        String field1Name = "long_field";
+        String field2Name = "vec_field";
+        List<FieldType> fieldsSchema = new ArrayList<>();
+        fieldsSchema.add(FieldType.newBuilder()
+                .withPrimaryKey(true)
+                .withAutoID(true)
+                .withDataType(DataType.Int64)
+                .withName(field1Name)
+                .withDescription("identity")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.FloatVector)
+                .withName(field2Name)
+                .withDescription("face")
+                .withDimension(dimension)
+                .build());
+
+        // create collection
+        CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withDescription("test")
+                .withFieldTypes(fieldsSchema)
+                .build();
+
+        R<RpcStatus> createR = client.createCollection(createParam);
+        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        // insert async
+        List<ListenableFuture<R<MutationResult>>> futureResponses = new ArrayList<>();
+        int rowCount = 1000;
+        for (long i = 0L; i < 10; ++i) {
+            List<List<Float>> vectors = normalizeFloatVectors(generateFloatVectors(rowCount));
+            List<InsertParam.Field> fieldsInsert = new ArrayList<>();
+            fieldsInsert.add(new InsertParam.Field(field2Name, DataType.FloatVector, vectors));
+
+            InsertParam insertParam = InsertParam.newBuilder()
+                    .withCollectionName(randomCollectionName)
+                    .withFields(fieldsInsert)
+                    .build();
+
+            ListenableFuture<R<MutationResult>> insertFuture = client.insertAsync(insertParam);
+            futureResponses.add(insertFuture);
+        }
+
+        // get insert result
+        List<Long> queryIDs = new ArrayList<>();
+        for (ListenableFuture<R<MutationResult>> response : futureResponses) {
+            try {
+                R<MutationResult> insertR = response.get();
+                assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+                MutationResultWrapper wrapper = new MutationResultWrapper(insertR.getData());
+                queryIDs.add(wrapper.getLongIDs().get(0));
+            } catch (ExecutionException | InterruptedException e) {
+                System.out.println("failed to insert:" + e.getMessage());
+                return;
+            }
+        }
+
+        // get collection statistics
+        R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam
+                .newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFlush(true)
+                .build());
+        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+
+        GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
+        System.out.println("Collection row count: " + stat.getRowCount());
+
+        // load collection
+        R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+
+        // search async
+        List<List<Float>> targetVectors = normalizeFloatVectors(generateFloatVectors(2));
+        int topK = 5;
+        SearchParam searchParam = SearchParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withMetricType(MetricType.IP)
+                .withTopK(topK)
+                .withVectors(targetVectors)
+                .withVectorFieldName(field2Name)
+                .build();
+
+        ListenableFuture<R<SearchResults>> searchFuture = client.searchAsync(searchParam);
+
+        // query async
+        String expr = field1Name + " in " + queryIDs.toString();
+        List<String> outputFields = Arrays.asList(field1Name, field2Name);
+        QueryParam queryParam = QueryParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withExpr(expr)
+                .withOutFields(outputFields)
+                .build();
+
+        ListenableFuture<R<QueryResults>> queryFuture = client.queryAsync(queryParam);
+
+        try {
+            // get search results
+            R<SearchResults> searchR = searchFuture.get();
+            assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+
+            // verify search result
+            SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
+            System.out.println("Search results:");
+            for (int i = 0; i < targetVectors.size(); ++i) {
+                List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
+                assertEquals(topK, scores.size());
+                System.out.println(scores.toString());
+            }
+
+            // get query results
+            R<QueryResults> queryR = queryFuture.get();
+            assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+
+            // verify query result
+            QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
+            for (String fieldName : outputFields) {
+                FieldDataWrapper wrapper = queryResultsWrapper.getFieldWrapper(fieldName);
+                System.out.println("Query data of " + fieldName + ", row count: " + wrapper.getRowCount());
+                System.out.println(wrapper.getFieldData());
+                assertEquals(queryIDs.size(), wrapper.getFieldData().size());
+            }
+
+        } catch (InterruptedException | ExecutionException e) {
+            e.printStackTrace();
+        }
+
+        // drop collection
+        DropCollectionParam dropParam = DropCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build();
+
+        R<RpcStatus> dropR = client.dropCollection(dropParam);
+        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+    }
+}