Browse Source

Merge pull request #133 from thirstycrow/0.9.0

Support gRPC timeout, target string and load-balancing policy
Xiaohai Xu 4 years ago
parent
commit
404993fee8

+ 1 - 1
examples/src/main/java/MilvusClientExample.java

@@ -145,7 +145,7 @@ public class MilvusClientExample {
     CollectionMapping collectionMapping =
         new CollectionMapping.Builder(collectionName)
             .withFields(generateDefaultFields(dimension))
-            .withParamsInJson("{\"segment_row_count\": 50000, \"auto_id\": true}")
+            .withParamsInJson("{\"segment_row_limit\": 50000, \"auto_id\": true}")
             .build();
     Response createCollectionResponse = client.createCollection(collectionMapping);
 

+ 44 - 12
pom.xml

@@ -117,11 +117,6 @@
             <version>1.2</version>
             <scope>provided</scope> <!-- not needed at runtime -->
         </dependency>
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-testing</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java-util</artifactId>
@@ -132,12 +127,6 @@
             <artifactId>error_prone_annotations</artifactId>
             <version>2.3.4</version> <!-- prefer to use 2.3.3 or later -->
         </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter</artifactId>
-            <version>5.5.2</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-text</artifactId>
@@ -163,7 +152,29 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <version>2.12.1</version>
         </dependency>
-
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>5.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.14.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>1.14.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
@@ -220,6 +231,12 @@
     </profiles>
 
     <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
         <extensions>
             <extension>
                 <groupId>kr.motd.maven</groupId>
@@ -288,6 +305,21 @@
                     </execution>
                 </executions>
             </plugin>
+            <!-- JUnit5 tests are not running with maven 3.6.x
+            https://dzone.com/articles/why-your-junit-5-tests-are-not-running-under-maven
+            -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.22.2</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.junit.jupiter</groupId>
+                        <artifactId>junit-jupiter-engine</artifactId>
+                        <version>5.1.0</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
         </plugins>
     </build>
 

+ 1 - 1
src/main/java/io/milvus/client/CollectionMapping.java

@@ -118,7 +118,7 @@ public class CollectionMapping {
     /**
      * Build with extra params in json string format.
      *
-     * @param paramsInJson Two optional parameters can be included. "segment_row_count" is default
+     * @param paramsInJson Two optional parameters can be included. "segment_row_limit" is default
      *                     to 100,000. Merge will be triggered if more than this number of entities
      *                     are inserted into collection. "auto_id" is default to <code>true</code>.
      *                     Entity ids will be auto-generated by Milvus if set to true.

+ 40 - 0
src/main/java/io/milvus/client/ConnectParam.java

@@ -19,13 +19,17 @@
 
 package io.milvus.client;
 
+import io.grpc.ManagedChannelBuilder;
+
 import javax.annotation.Nonnull;
 import java.util.concurrent.TimeUnit;
 
 /** Contains parameters for connecting to Milvus server */
 public class ConnectParam {
+  private final String target;
   private final String host;
   private final int port;
+  private final String defaultLoadBalancingPolicy;
   private final long connectTimeoutNanos;
   private final long keepAliveTimeNanos;
   private final long keepAliveTimeoutNanos;
@@ -33,8 +37,10 @@ public class ConnectParam {
   private final long idleTimeoutNanos;
 
   private ConnectParam(@Nonnull Builder builder) {
+    this.target = builder.target;
     this.host = builder.host;
     this.port = builder.port;
+    this.defaultLoadBalancingPolicy = builder.defaultLoadBalancingPolicy;
     this.connectTimeoutNanos = builder.connectTimeoutNanos;
     this.keepAliveTimeNanos = builder.keepAliveTimeNanos;
     this.keepAliveTimeoutNanos = builder.keepAliveTimeoutNanos;
@@ -42,6 +48,10 @@ public class ConnectParam {
     this.idleTimeoutNanos = builder.idleTimeoutNanos;
   }
 
+  public String getTarget() {
+    return target;
+  }
+
   public String getHost() {
     return host;
   }
@@ -50,6 +60,10 @@ public class ConnectParam {
     return port;
   }
 
+  public String getDefaultLoadBalancingPolicy() {
+    return defaultLoadBalancingPolicy;
+  }
+
   public long getConnectTimeout(@Nonnull TimeUnit timeUnit) {
     return timeUnit.convert(connectTimeoutNanos, TimeUnit.NANOSECONDS);
   }
@@ -73,14 +87,29 @@ public class ConnectParam {
   /** Builder for <code>ConnectParam</code> */
   public static class Builder {
     // Optional parameters - initialized to default values
+    private String target = null;
     private String host = "localhost";
     private int port = 19530;
+    private String defaultLoadBalancingPolicy = "round_robin";
     private long connectTimeoutNanos = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
     private long keepAliveTimeNanos = Long.MAX_VALUE; // Disabling keepalive
     private long keepAliveTimeoutNanos = TimeUnit.NANOSECONDS.convert(20, TimeUnit.SECONDS);
     private boolean keepAliveWithoutCalls = false;
     private long idleTimeoutNanos = TimeUnit.NANOSECONDS.convert(24, TimeUnit.HOURS);
 
+    /**
+     * Optional. Defaults to null. Will be used in precedence to host and port.
+     *
+     * @param target a GRPC target string
+     * @return <code>Builder</code>
+     *
+     * @see ManagedChannelBuilder#forTarget(String)
+     */
+    public Builder withTarget(@Nonnull String target) {
+      this.target = target;
+      return this;
+    }
+
     /**
      * Optional. Defaults to "localhost".
      *
@@ -106,6 +135,17 @@ public class ConnectParam {
       return this;
     }
 
+    /**
+     * Optional. Defaults to "round_robin".
+     *
+     * @param defaultLoadBalancingPolicy the default load-balancing policy name
+     * @return <code>Builder</code>
+     */
+    public Builder withDefaultLoadBalancingPolicy(String defaultLoadBalancingPolicy) {
+      this.defaultLoadBalancingPolicy = defaultLoadBalancingPolicy;
+      return this;
+    }
+
     /**
      * Optional. Defaults to 10 seconds.
      *

+ 32 - 32
src/main/java/io/milvus/client/MilvusClient.java

@@ -20,12 +20,34 @@
 package io.milvus.client;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 /** The Milvus Client Interface */
 public interface MilvusClient {
 
-  String clientVersion = "0.9.0";
+  String clientVersion = new Supplier<String>() {
+
+    @Override
+    /** @return current Milvus client version */
+    public String get() {
+      Properties properties = new Properties();
+      try (InputStream inputStream =
+               MilvusClient.class.getClassLoader()
+                   .getResourceAsStream("milvus-client.properties")) {
+        properties.load(inputStream);
+      } catch (IOException ex) {
+        ExceptionUtils.wrapAndThrow(ex);
+      }
+      return properties.getProperty("version");
+    }
+  }.get();
 
   /** @return current Milvus client version: 0.9.0 */
   default String getClientVersion() {
@@ -33,40 +55,18 @@ public interface MilvusClient {
   }
 
   /**
-   * Connects to Milvus server
-   *
-   * @param connectParam the <code>ConnectParam</code> object
-   * <pre>
-   * example usage:
-   * <code>
-   * ConnectParam connectParam = new ConnectParam.Builder()
-   *                                             .withHost("localhost")
-   *                                             .withPort(19530)
-   *                                             .withConnectTimeout(10, TimeUnit.SECONDS)
-   *                                             .withKeepAliveTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)
-   *                                             .withKeepAliveTimeout(20, TimeUnit.SECONDS)
-   *                                             .keepAliveWithoutCalls(false)
-   *                                             .withIdleTimeout(24, TimeUnit.HOURS)
-   *                                             .build();
-   * </code>
-   * </pre>
-   *
-   * @return <code>Response</code>
-   * @throws ConnectFailedException if client failed to connect
-   * @see ConnectParam
-   * @see Response
-   * @see ConnectFailedException
+   * Close this MilvusClient. Wait at most 1 minute for graceful shutdown.
    */
-  Response connect(ConnectParam connectParam) throws ConnectFailedException;
+  default void close() {
+    close(TimeUnit.MINUTES.toSeconds(1));
+  }
 
   /**
-   * Disconnects from Milvus server
-   *
-   * @return <code>Response</code>
-   * @throws InterruptedException if disconnect interrupted
-   * @see Response
+   * Close this MilvusClient. Wait at most `maxWaitSeconds` for graceful shutdown.
    */
-  Response disconnect() throws InterruptedException;
+  void close(long maxWaitSeconds);
+
+  MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit);
 
   /**
    * Creates collection specified by <code>collectionMapping</code>
@@ -77,7 +77,7 @@ public interface MilvusClient {
    * <code>
    * CollectionMapping collectionMapping = new CollectionMapping.Builder(collectionName)
    *                                                            .withFields(fields)
-   *                                                            .withParamsInJson("{"segment_row_count": 100000}")
+   *                                                            .withParamsInJson("{"segment_row_limit": 100000}")
    *                                                            .build();
    * </code>
    * Refer to <code>withFields</code> method for example <code>fields</code> usage.

+ 182 - 143
src/main/java/io/milvus/client/MilvusGrpcClient.java

@@ -24,11 +24,25 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.protobuf.ByteString;
-import io.grpc.ConnectivityState;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
 import io.grpc.StatusRuntimeException;
+import io.milvus.client.exception.InitializationException;
+import io.milvus.client.exception.UnsupportedServerVersion;
 import io.milvus.grpc.*;
+import org.apache.commons.lang3.ArrayUtils;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,114 +53,149 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import javax.annotation.Nonnull;
-import org.apache.commons.lang3.ArrayUtils;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** Actual implementation of interface <code>MilvusClient</code> */
-public class MilvusGrpcClient implements MilvusClient {
+public class MilvusGrpcClient extends AbstractMilvusGrpcClient {
 
   private static final Logger logger = LoggerFactory.getLogger(MilvusGrpcClient.class);
-  private final String extraParamKey = "params";
-  private ManagedChannel channel = null;
-  private MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub = null;
-  private MilvusServiceGrpc.MilvusServiceFutureStub futureStub = null;
+  private static final String SUPPORTED_SERVER_VERSION = "0.11";
+
+  private final ManagedChannel channel;
+  private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
+  private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
+
+  public MilvusGrpcClient(ConnectParam connectParam) {
+    ManagedChannelBuilder builder = connectParam.getTarget() != null
+        ? ManagedChannelBuilder.forTarget(connectParam.getTarget())
+        : ManagedChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort());
+
+    channel = builder.usePlaintext()
+        .maxInboundMessageSize(Integer.MAX_VALUE)
+        .defaultLoadBalancingPolicy(connectParam.getDefaultLoadBalancingPolicy())
+        .keepAliveTime(connectParam.getKeepAliveTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
+        .keepAliveTimeout(connectParam.getKeepAliveTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
+        .keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls())
+        .idleTimeout(connectParam.getIdleTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
+        .build();
+    blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
+    futureStub = MilvusServiceGrpc.newFutureStub(channel);
+    try {
+      Response response = getServerVersion();
+      if (response.ok()) {
+        String serverVersion = response.getMessage();
+        if (!serverVersion.matches("^" + SUPPORTED_SERVER_VERSION + "(\\..*)?$")) {
+          throw new UnsupportedServerVersion(connectParam.getHost(), SUPPORTED_SERVER_VERSION, serverVersion);
+        }
+      } else {
+        throw new InitializationException(connectParam.getHost(), response.getMessage());
+      }
+    } catch (Throwable t) {
+      channel.shutdownNow();
+      throw t;
+    }
+  }
 
-  ////////////////////// Constructor //////////////////////
-  public MilvusGrpcClient() {}
+  @Override
+  protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+    return blockingStub;
+  }
 
-  /////////////////////// Client Calls///////////////////////
+  @Override
+  protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+    return futureStub;
+  }
 
   @Override
-  public Response connect(ConnectParam connectParam) throws ConnectFailedException {
-    if (channel != null && !(channel.isShutdown() || channel.isTerminated())) {
-      logWarning("Channel is not shutdown or terminated");
-      throw new ConnectFailedException("Channel is not shutdown or terminated");
+  protected boolean maybeAvailable() {
+    switch (channel.getState(false)) {
+      case IDLE:
+      case CONNECTING:
+      case READY:
+        return true;
+      default:
+        return false;
     }
+  }
 
+  @Override
+  public void close(long maxWaitSeconds) {
+    channel.shutdown();
     try {
+      channel.awaitTermination(maxWaitSeconds, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      logger.warn("Milvus client close interrupted");
+      channel.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
 
-      channel =
-          ManagedChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort())
-              .usePlaintext()
-              .maxInboundMessageSize(Integer.MAX_VALUE)
-              .keepAliveTime(
-                  connectParam.getKeepAliveTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
-              .keepAliveTimeout(
-                  connectParam.getKeepAliveTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
-              .keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls())
-              .idleTimeout(connectParam.getIdleTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
-              .build();
-
-      channel.getState(true);
+  public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
+    final long timeoutMillis = timeoutUnit.toMillis(timeout);
+    final TimeoutInterceptor timeoutInterceptor = new TimeoutInterceptor(timeoutMillis);
+    final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub =
+        this.blockingStub.withInterceptors(timeoutInterceptor);
+    final MilvusServiceGrpc.MilvusServiceFutureStub futureStub =
+        this.futureStub.withInterceptors(timeoutInterceptor);
 
-      long timeout = connectParam.getConnectTimeout(TimeUnit.MILLISECONDS);
-      logInfo("Trying to connect...Timeout in {} ms", timeout);
+    return new AbstractMilvusGrpcClient() {
 
-      final long checkFrequency = 100; // ms
-      while (channel.getState(false) != ConnectivityState.READY) {
-        if (timeout <= 0) {
-          logError("Connect timeout!");
-          throw new ConnectFailedException("Connect timeout");
-        }
-        TimeUnit.MILLISECONDS.sleep(checkFrequency);
-        timeout -= checkFrequency;
+      @Override
+      protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+        return blockingStub;
       }
 
-      blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
-      futureStub = MilvusServiceGrpc.newFutureStub(channel);
+      @Override
+      protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+        return futureStub;
+      }
 
-      // check server version
-      String serverVersion = getServerVersion().getMessage();
-      if (!serverVersion.contains("0.11.")) {
-        logError(
-            "Connect failed! Server version {} does not match SDK version 0.9.0", serverVersion);
-        throw new ConnectFailedException("Failed to connect to Milvus server.");
+      @Override
+      protected boolean maybeAvailable() {
+        return MilvusGrpcClient.this.maybeAvailable();
       }
 
-    } catch (Exception e) {
-      if (!(e instanceof ConnectFailedException)) {
-        logError("Connect failed! {}", e.toString());
+      @Override
+      public void close(long maxWaitSeconds) {
+        MilvusGrpcClient.this.close(maxWaitSeconds);
       }
-      throw new ConnectFailedException("Exception occurred: " + e.toString());
-    }
 
-    logInfo(
-        "Connection established successfully to host={}, port={}",
-        connectParam.getHost(),
-        String.valueOf(connectParam.getPort()));
-    return new Response(Response.Status.SUCCESS);
+      @Override
+      public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
+        return MilvusGrpcClient.this.withTimeout(timeout, timeoutUnit);
+      }
+    };
   }
 
-  @Override
-  public Response disconnect() throws InterruptedException {
-    if (!channelIsReadyOrIdle()) {
-      logWarning("You are not connected to Milvus server");
-      return new Response(Response.Status.CLIENT_NOT_CONNECTED);
-    } else {
-      try {
-        if (channel.shutdown().awaitTermination(60, TimeUnit.SECONDS)) {
-          logInfo("Channel terminated");
-        } else {
-          logError("Encountered error when terminating channel");
-          return new Response(Response.Status.RPC_ERROR);
-        }
-      } catch (InterruptedException e) {
-        logError("Exception thrown when terminating channel: {}", e.toString());
-        throw e;
-      }
+  private static class TimeoutInterceptor implements ClientInterceptor {
+    private long timeoutMillis;
+
+    TimeoutInterceptor(long timeoutMillis) {
+      this.timeoutMillis = timeoutMillis;
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+      return next.newCall(method, callOptions.withDeadlineAfter(timeoutMillis, TimeUnit.MILLISECONDS));
     }
-    return new Response(Response.Status.SUCCESS);
   }
+}
+
+abstract class AbstractMilvusGrpcClient implements MilvusClient {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
+
+  private final String extraParamKey = "params";
+
+  protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
+
+  protected abstract MilvusServiceGrpc.MilvusServiceFutureStub futureStub();
+
+  protected abstract boolean maybeAvailable();
 
   @Override
   public Response createCollection(@Nonnull CollectionMapping collectionMapping) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -191,7 +240,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.createCollection(request);
+      response = blockingStub().createCollection(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Created collection successfully!\n{}", collectionMapping.toString());
@@ -215,7 +264,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public HasCollectionResponse hasCollection(@Nonnull String collectionName) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new HasCollectionResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), false);
     }
@@ -224,7 +273,7 @@ public class MilvusGrpcClient implements MilvusClient {
     BoolReply response;
 
     try {
-      response = blockingStub.hasCollection(request);
+      response = blockingStub().hasCollection(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("hasCollection `{}` = {}", collectionName, response.getBoolReply());
@@ -248,7 +297,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public Response dropCollection(@Nonnull String collectionName) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -257,7 +306,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.dropCollection(request);
+      response = blockingStub().dropCollection(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Dropped collection `{}` successfully!", collectionName);
@@ -276,7 +325,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public Response createIndex(@Nonnull Index index) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -309,7 +358,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.createIndex(request);
+      response = blockingStub().createIndex(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Created index successfully!\n{}", index.toString());
@@ -328,7 +377,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public ListenableFuture<Response> createIndexAsync(@Nonnull Index index) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
     }
@@ -360,7 +409,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
     ListenableFuture<Status> response;
 
-    response = futureStub.createIndex(request);
+    response = futureStub().createIndex(request);
 
     Futures.addCallback(
         response,
@@ -388,7 +437,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public Response createPartition(String collectionName, String tag) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -399,7 +448,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.createPartition(request);
+      response = blockingStub().createPartition(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Created partition `{}` in collection `{}` successfully!", tag, collectionName);
@@ -422,7 +471,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public HasPartitionResponse hasPartition(String collectionName, String tag) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new HasPartitionResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), false);
     }
@@ -432,7 +481,7 @@ public class MilvusGrpcClient implements MilvusClient {
     BoolReply response;
 
     try {
-      response = blockingStub.hasPartition(request);
+      response = blockingStub().hasPartition(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo(
@@ -463,7 +512,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public ListPartitionsResponse listPartitions(String collectionName) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new ListPartitionsResponse(
           new Response(Response.Status.CLIENT_NOT_CONNECTED), Collections.emptyList());
@@ -473,7 +522,7 @@ public class MilvusGrpcClient implements MilvusClient {
     PartitionList response;
 
     try {
-      response = blockingStub.showPartitions(request);
+      response = blockingStub().showPartitions(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo(
@@ -500,7 +549,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public Response dropPartition(String collectionName, String tag) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -510,7 +559,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.dropPartition(request);
+      response = blockingStub().dropPartition(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Dropped partition `{}` in collection `{}` successfully!", tag, collectionName);
@@ -534,7 +583,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @SuppressWarnings("unchecked")
   public InsertResponse insert(@Nonnull InsertParam insertParam) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new InsertResponse(
           new Response(Response.Status.CLIENT_NOT_CONNECTED), Collections.emptyList());
@@ -621,7 +670,7 @@ public class MilvusGrpcClient implements MilvusClient {
     EntityIds response;
 
     try {
-      response = blockingStub.insert(request);
+      response = blockingStub().insert(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo(
@@ -649,7 +698,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @SuppressWarnings("unchecked")
   public ListenableFuture<InsertResponse> insertAsync(@Nonnull InsertParam insertParam) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return Futures.immediateFuture(
           new InsertResponse(
@@ -739,7 +788,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
     ListenableFuture<EntityIds> response;
 
-    response = futureStub.insert(request);
+    response = futureStub().insert(request);
 
     Futures.addCallback(
         response,
@@ -783,7 +832,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public SearchResponse search(@Nonnull SearchParam searchParam) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       SearchResponse searchResponse = new SearchResponse();
       searchResponse.setResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED));
@@ -889,7 +938,7 @@ public class MilvusGrpcClient implements MilvusClient {
     QueryResult response;
 
     try {
-      response = blockingStub.search(request);
+      response = blockingStub().search(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         SearchResponse searchResponse = buildSearchResponse(response);
@@ -918,7 +967,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public ListenableFuture<SearchResponse> searchAsync(@Nonnull SearchParam searchParam) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       SearchResponse searchResponse = new SearchResponse();
       searchResponse.setResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED));
@@ -1023,7 +1072,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
     ListenableFuture<QueryResult> response;
 
-    response = futureStub.search(request);
+    response = futureStub().search(request);
 
     Futures.addCallback(
         response,
@@ -1068,7 +1117,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public GetCollectionInfoResponse getCollectionInfo(@Nonnull String collectionName) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new GetCollectionInfoResponse(
           new Response(Response.Status.CLIENT_NOT_CONNECTED), null);
@@ -1078,7 +1127,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Mapping response;
 
     try {
-      response = blockingStub.describeCollection(request);
+      response = blockingStub().describeCollection(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         String extraParam = "";
@@ -1128,7 +1177,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public ListCollectionsResponse listCollections() {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new ListCollectionsResponse(
           new Response(Response.Status.CLIENT_NOT_CONNECTED), Collections.emptyList());
@@ -1138,7 +1187,7 @@ public class MilvusGrpcClient implements MilvusClient {
     CollectionNameList response;
 
     try {
-      response = blockingStub.showCollections(request);
+      response = blockingStub().showCollections(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         List<String> collectionNames = response.getCollectionNamesList();
@@ -1162,7 +1211,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public CountEntitiesResponse countEntities(@Nonnull String collectionName) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new CountEntitiesResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), 0);
     }
@@ -1171,7 +1220,7 @@ public class MilvusGrpcClient implements MilvusClient {
     CollectionRowCount response;
 
     try {
-      response = blockingStub.countCollection(request);
+      response = blockingStub().countCollection(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         long collectionRowCount = response.getCollectionRowCount();
@@ -1206,7 +1255,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
   public Response command(@Nonnull String command) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -1215,7 +1264,7 @@ public class MilvusGrpcClient implements MilvusClient {
     StringReply response;
 
     try {
-      response = blockingStub.cmd(request);
+      response = blockingStub().cmd(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Command `{}`: {}", command, response.getStringReply());
@@ -1235,7 +1284,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public Response loadCollection(@Nonnull String collectionName) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -1244,7 +1293,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.preloadCollection(request);
+      response = blockingStub().preloadCollection(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Loaded collection `{}` successfully!", collectionName);
@@ -1263,7 +1312,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public Response dropIndex(String collectionName, String fieldName) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -1276,7 +1325,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.dropIndex(request);
+      response = blockingStub().dropIndex(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Dropped index for collection `{}` successfully!", collectionName);
@@ -1294,7 +1343,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
   @Override
   public Response getCollectionStats(String collectionName) {
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -1303,7 +1352,7 @@ public class MilvusGrpcClient implements MilvusClient {
     io.milvus.grpc.CollectionInfo response;
 
     try {
-      response = blockingStub.showCollectionInfo(request);
+      response = blockingStub().showCollectionInfo(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("getCollectionStats for `{}` returned successfully!", collectionName);
@@ -1325,7 +1374,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
   @Override
   public GetEntityByIDResponse getEntityByID(String collectionName, List<Long> ids, List<String> fieldNames) {
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new GetEntityByIDResponse(
           new Response(Response.Status.CLIENT_NOT_CONNECTED), Collections.emptyList());
@@ -1340,7 +1389,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Entities response;
 
     try {
-      response = blockingStub.getEntityByID(request);
+      response = blockingStub().getEntityByID(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
 
@@ -1411,7 +1460,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
   @Override
   public ListIDInSegmentResponse listIDInSegment(String collectionName, Long segmentId) {
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new ListIDInSegmentResponse(
           new Response(Response.Status.CLIENT_NOT_CONNECTED), Collections.emptyList());
@@ -1425,7 +1474,7 @@ public class MilvusGrpcClient implements MilvusClient {
     EntityIds response;
 
     try {
-      response = blockingStub.getEntityIDs(request);
+      response = blockingStub().getEntityIDs(request);
 
       if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
 
@@ -1456,7 +1505,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
   @Override
   public Response deleteEntityByID(String collectionName, List<Long> ids) {
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -1466,7 +1515,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.deleteByID(request);
+      response = blockingStub().deleteByID(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("deleteEntityByID in collection `{}` completed successfully!", collectionName);
@@ -1485,7 +1534,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
   @Override
   public Response flush(List<String> collectionNames) {
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -1494,7 +1543,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.flush(request);
+      response = blockingStub().flush(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Flushed collection {} successfully!", collectionNames);
@@ -1513,7 +1562,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public ListenableFuture<Response> flushAsync(@Nonnull List<String> collectionNames) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
     }
@@ -1522,7 +1571,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
     ListenableFuture<Status> response;
 
-    response = futureStub.flush(request);
+    response = futureStub().flush(request);
 
     Futures.addCallback(
         response,
@@ -1571,7 +1620,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
   @Override
   public Response compact(CompactParam compactParam) {
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return new Response(Response.Status.CLIENT_NOT_CONNECTED);
     }
@@ -1584,7 +1633,7 @@ public class MilvusGrpcClient implements MilvusClient {
     Status response;
 
     try {
-      response = blockingStub.compact(request);
+      response = blockingStub().compact(request);
 
       if (response.getErrorCode() == ErrorCode.SUCCESS) {
         logInfo("Compacted collection `{}` successfully!", compactParam.getCollectionName());
@@ -1604,7 +1653,7 @@ public class MilvusGrpcClient implements MilvusClient {
   @Override
   public ListenableFuture<Response> compactAsync(@Nonnull CompactParam compactParam) {
 
-    if (!channelIsReadyOrIdle()) {
+    if (!maybeAvailable()) {
       logWarning("You are not connected to Milvus server");
       return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
     }
@@ -1617,7 +1666,7 @@ public class MilvusGrpcClient implements MilvusClient {
 
     ListenableFuture<Status> response;
 
-    response = futureStub.compact(request);
+    response = futureStub().compact(request);
 
     Futures.addCallback(
         response,
@@ -1730,16 +1779,6 @@ public class MilvusGrpcClient implements MilvusClient {
     return searchResponse;
   }
 
-  private boolean channelIsReadyOrIdle() {
-    if (channel == null) {
-      return false;
-    }
-    ConnectivityState connectivityState = channel.getState(false);
-    return connectivityState == ConnectivityState.READY
-        || connectivityState
-            == ConnectivityState.IDLE; // Since a new RPC would take the channel out of idle mode
-  }
-
   private String kvListToString(List<KeyValuePair> kv) {
     JSONObject jsonObject = new JSONObject();
     for (KeyValuePair keyValuePair : kv) {

+ 7 - 0
src/main/java/io/milvus/client/exception/InitializationException.java

@@ -0,0 +1,7 @@
+package io.milvus.client.exception;
+
+public class InitializationException extends MilvusException {
+  public InitializationException(String host, String message) {
+    super(false, host + ": " + message);
+  }
+}

+ 24 - 0
src/main/java/io/milvus/client/exception/MilvusException.java

@@ -0,0 +1,24 @@
+package io.milvus.client.exception;
+
+public class MilvusException extends RuntimeException {
+  private boolean fillInStackTrace;
+
+  MilvusException(boolean fillInStackTrace) {
+    this.fillInStackTrace = fillInStackTrace;
+  }
+
+  MilvusException(boolean fillInStackTrace, Throwable cause) {
+    super(cause);
+    this.fillInStackTrace = fillInStackTrace;
+  }
+
+  MilvusException(boolean fillInStackTrace, String message) {
+    super(message);
+    this.fillInStackTrace = fillInStackTrace;
+  }
+
+  @Override
+  public synchronized Throwable fillInStackTrace() {
+    return fillInStackTrace ? super.fillInStackTrace() : this;
+  }
+}

+ 22 - 0
src/main/java/io/milvus/client/exception/UnsupportedServerVersion.java

@@ -0,0 +1,22 @@
+package io.milvus.client.exception;
+
+import io.milvus.client.MilvusClient;
+
+public class UnsupportedServerVersion extends MilvusException {
+  private String host;
+  private String expect;
+  private String actual;
+
+  public UnsupportedServerVersion(String host, String expect, String actual) {
+    super(false);
+    this.host = host;
+    this.expect = expect;
+    this.actual = actual;
+  }
+
+  @Override
+  public String getMessage() {
+    return String.format("%s: Milvus client %s is expected to work with Milvus server %s, but the version of the connected server is %s",
+        host, MilvusClient.clientVersion, expect, actual);
+  }
+}

+ 2 - 2
src/main/proto/milvus.proto

@@ -63,9 +63,9 @@ message FieldName {
  * @brief Collection mapping
  * @extra_params: key-value pair for extra parameters of the collection
  *    typically usage:
- *        extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
+ *        extra_params["params"] = {segment_row_limit: 1000000, auto_id: true}
  *    Note:
- *        the segment_row_count specify segment row count limit for merging
+ *        the segment_row_limit specify segment row count limit for merging
  *        the auto_id = true means entity id is auto-generated by milvus
  */
 message Mapping {

+ 3 - 0
src/main/resources/milvus-client.properties

@@ -0,0 +1,3 @@
+groupId=${project.groupId}
+artifactId=${project.artifactId}
+version=${project.version}

+ 122 - 26
src/test/java/io/milvus/client/MilvusGrpcClientTest.java

@@ -23,33 +23,106 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.NameResolverProvider;
+import io.grpc.NameResolverRegistry;
 import io.milvus.client.InsertParam.Builder;
 import io.milvus.client.Response.Status;
+import io.milvus.client.exception.InitializationException;
+import io.milvus.client.exception.UnsupportedServerVersion;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.text.RandomStringGenerator;
 import org.checkerframework.checker.nullness.compatqual.NullableDecl;
-import org.json.*;
-
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SplittableRandom;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Testcontainers
+@EnabledIfSystemProperty(named = "with-containers", matches = "true")
+class ContainerMilvusClientTest extends MilvusClientTest {
+  @Container
+  private GenericContainer milvusContainer =
+      new GenericContainer(System.getProperty("docker_image_name", "milvusdb/milvus:0.11.0-cpu"))
+          .withExposedPorts(19530);
+
+  @Container
+  private static GenericContainer milvusContainer2 =
+      new GenericContainer(System.getProperty("docker_image_name", "milvusdb/milvus:0.11.0-cpu"))
+          .withExposedPorts(19530);
+
+  @Override
+  protected ConnectParam.Builder connectParamBuilder() {
+    return connectParamBuilder(milvusContainer);
+  }
+
+  @org.junit.jupiter.api.Test
+  void loadBalancing() {
+    NameResolverProvider testNameResolverProvider = new StaticNameResolverProvider(
+        new InetSocketAddress(milvusContainer.getHost(), milvusContainer.getFirstMappedPort()),
+        new InetSocketAddress(milvusContainer2.getHost(), milvusContainer2.getFirstMappedPort()));
 
+    NameResolverRegistry.getDefaultRegistry().register(testNameResolverProvider);
+
+    ConnectParam connectParam = connectParamBuilder()
+        .withTarget(testNameResolverProvider.getDefaultScheme() + ":///test")
+        .build();
+
+    MilvusClient loadBalancingClient = new MilvusGrpcClient(connectParam);
+    assertEquals(50, IntStream.range(0, 100)
+            .filter(i -> loadBalancingClient.hasCollection(randomCollectionName).hasCollection())
+            .count());
+  }
+}
+
+@Testcontainers
+@DisabledIfSystemProperty(named = "with-containers", matches = "true")
 class MilvusClientTest {
 
   private MilvusClient client;
 
   private RandomStringGenerator generator;
 
-  private String randomCollectionName;
+  protected String randomCollectionName;
   private int size;
   private int dimension;
 
+  protected ConnectParam.Builder connectParamBuilder() {
+    return connectParamBuilder("localhost", 19530);
+  }
+
+  protected ConnectParam.Builder connectParamBuilder(GenericContainer milvusContainer) {
+    return connectParamBuilder(milvusContainer.getHost(), milvusContainer.getFirstMappedPort());
+  }
+
+  protected ConnectParam.Builder connectParamBuilder(String host, int port) {
+    return new ConnectParam.Builder().withHost(host).withPort(port);
+  }
+
   // Helper function that generates random float vectors
   static List<List<Float>> generateFloatVectors(int vectorCount, int dimension) {
     SplittableRandom splittableRandom = new SplittableRandom();
@@ -127,10 +200,8 @@ class MilvusClientTest {
   @org.junit.jupiter.api.BeforeEach
   void setUp() throws Exception {
 
-    client = new MilvusGrpcClient();
-    ConnectParam connectParam =
-        new ConnectParam.Builder().withHost("localhost").withPort(19530).build();
-    client.connect(connectParam);
+    ConnectParam connectParam = connectParamBuilder().build();
+    client = new MilvusGrpcClient(connectParam);
 
     generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build();
     randomCollectionName = generator.generate(10);
@@ -143,7 +214,7 @@ class MilvusClientTest {
             .field(new FieldBuilder("float_vec", DataType.VECTOR_FLOAT)
                 .param("dim", dimension)
                 .build())
-            .withParamsInJson(new JsonBuilder().param("segment_row_count", 50000)
+            .withParamsInJson(new JsonBuilder().param("segment_row_limit", 50000)
                                                .param("auto_id", false)
                                                .build())
             .build();
@@ -152,20 +223,17 @@ class MilvusClientTest {
   }
 
   @org.junit.jupiter.api.AfterEach
-  void tearDown() throws InterruptedException {
-    assertTrue(client.dropCollection(randomCollectionName).ok());
-    client.disconnect();
+  void tearDown() {
+    client.dropCollection(randomCollectionName);
+    client.close();
   }
 
   @org.junit.jupiter.api.Test
-  void idleTest() throws InterruptedException, ConnectFailedException {
-    MilvusClient client = new MilvusGrpcClient();
-    ConnectParam connectParam =
-        new ConnectParam.Builder()
-            .withHost("localhost")
-            .withIdleTimeout(1, TimeUnit.SECONDS)
-            .build();
-    client.connect(connectParam);
+  void idleTest() throws InterruptedException {
+    ConnectParam connectParam = connectParamBuilder()
+        .withIdleTimeout(1, TimeUnit.SECONDS)
+        .build();
+    MilvusClient client = new MilvusGrpcClient(connectParam);
     TimeUnit.SECONDS.sleep(2);
     // A new RPC would take the channel out of idle mode
     assertTrue(client.listCollections().ok());
@@ -206,9 +274,37 @@ class MilvusClientTest {
 
   @org.junit.jupiter.api.Test
   void connectUnreachableHost() {
-    MilvusClient client = new MilvusGrpcClient();
-    ConnectParam connectParam = new ConnectParam.Builder().withHost("250.250.250.250").build();
-    assertThrows(ConnectFailedException.class, () -> client.connect(connectParam));
+    ConnectParam connectParam = connectParamBuilder("250.250.250.250", 19530).build();
+    assertThrows(InitializationException.class, () -> new MilvusGrpcClient(connectParam));
+  }
+
+  @org.junit.jupiter.api.Test
+  void unsupportedServerVersion() {
+    GenericContainer unsupportedMilvusContainer =
+        new GenericContainer("milvusdb/milvus:0.9.1-cpu-d052920-e04ed5")
+            .withExposedPorts(19530);
+    try {
+      unsupportedMilvusContainer.start();
+      ConnectParam connectParam = connectParamBuilder(unsupportedMilvusContainer).build();
+      assertThrows(UnsupportedServerVersion.class, () -> new MilvusGrpcClient(connectParam));
+    } finally {
+      unsupportedMilvusContainer.stop();
+    }
+  }
+
+  @org.junit.jupiter.api.Test
+  void grpcTimeout() {
+    insert();
+    MilvusClient timeoutClient = client.withTimeout(1, TimeUnit.MILLISECONDS);
+    Response response = timeoutClient.createIndex(
+        new Index.Builder(randomCollectionName, "float_vec")
+            .withParamsInJson(new JsonBuilder()
+                .param("index_type", "IVF_FLAT")
+                .param("metric_type", "L2")
+                .indexParam("nlist", 2048)
+                .build())
+            .build());
+    assertEquals(Response.Status.RPC_ERROR, response.getStatus());
   }
 
   @org.junit.jupiter.api.Test
@@ -232,7 +328,7 @@ class MilvusClientTest {
     createCollectionResponse = client.createCollection(invalidCollectionMapping);
     assertFalse(createCollectionResponse.ok());
 
-    // invalid segment_row_count
+    // invalid segment_row_limit
     invalidCollectionMapping =
         new CollectionMapping.Builder("validCollectionName")
             .field(new FieldBuilder("int64", DataType.INT64).build())
@@ -240,7 +336,7 @@ class MilvusClientTest {
             .field(new FieldBuilder("float_vec", DataType.VECTOR_FLOAT)
                 .param("dim", dimension)
                 .build())
-            .withParamsInJson(new JsonBuilder().param("segment_row_count", -1000).build())
+            .withParamsInJson(new JsonBuilder().param("segment_row_limit", -1000).build())
             .build();
     createCollectionResponse = client.createCollection(invalidCollectionMapping);
     assertFalse(createCollectionResponse.ok());

+ 66 - 0
src/test/java/io/milvus/client/StaticNameResolverProvider.java

@@ -0,0 +1,66 @@
+package io.milvus.client;
+
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class StaticNameResolverProvider extends NameResolverProvider {
+  private List<SocketAddress> addresses;
+
+  public StaticNameResolverProvider(SocketAddress... addresses) {
+    this.addresses = Arrays.asList(addresses);
+  }
+
+  @Override
+  public String getDefaultScheme() {
+    return "static";
+  }
+
+  @Override
+  protected boolean isAvailable() {
+    return true;
+  }
+
+  @Override
+  protected int priority() {
+    return 0;
+  }
+
+  @Override
+  public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
+    if (!getDefaultScheme().equals(targetUri.getScheme())) {
+      return null;
+    }
+    return new NameResolver() {
+      @Override
+      public String getServiceAuthority() {
+        return "localhost";
+      }
+
+      @Override
+      public void start(Listener2 listener) {
+        List<EquivalentAddressGroup> addrs = addresses.stream()
+            .map(addr -> new EquivalentAddressGroup(Collections.singletonList(addr)))
+            .collect(Collectors.toList());
+
+        listener.onResult(
+            ResolutionResult.newBuilder()
+                .setAddresses(addrs)
+                .setAttributes(Attributes.EMPTY)
+                .build());
+      }
+
+      @Override
+      public void shutdown() {
+      }
+    };
+  }
+}