Browse Source

Support retry (#568)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 1 year ago
parent
commit
3911b25009

+ 15 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -52,6 +52,21 @@ public interface MilvusClient {
      */
      */
     MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit);
     MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit);
 
 
+    /**
+     * Number of retry attempts.
+     *
+     * @param retryTimes     number of retry attempts.
+     */
+    MilvusClient withRetry(int retryTimes);
+
+    /**
+     * Time interval between retry attempts. Default value is 500ms.
+     *
+     * @param interval     time interval between retry attempts.
+     * @param timeUnit     time unit
+     */
+    MilvusClient withRetryInterval(long interval, TimeUnit timeUnit);
+
     /**
     /**
      * Disconnects from a Milvus server with timeout of 1 minute
      * Disconnects from a Milvus server with timeout of 1 minute
      */
      */

+ 10 - 0
src/main/java/io/milvus/client/MilvusMultiServiceClient.java

@@ -107,6 +107,16 @@ public class MilvusMultiServiceClient implements MilvusClient {
         return clusterFactory.getMaster().getClient().withTimeout(timeout, timeoutUnit);
         return clusterFactory.getMaster().getClient().withTimeout(timeout, timeoutUnit);
     }
     }
 
 
+    @Override
+    public MilvusClient withRetry(int retryTimes) {
+        return clusterFactory.getMaster().getClient().withRetry(retryTimes);
+    }
+
+    @Override
+    public MilvusClient withRetryInterval(long interval, TimeUnit timeUnit) {
+        return clusterFactory.getMaster().getClient().withRetryInterval(interval, timeUnit);
+    }
+
     @Override
     @Override
     public void close(long maxWaitSeconds) throws InterruptedException {
     public void close(long maxWaitSeconds) throws InterruptedException {
         this.clusterFactory.getAvailableServerSettings().parallelStream()
         this.clusterFactory.getAvailableServerSettings().parallelStream()

+ 472 - 14
src/main/java/io/milvus/client/MilvusServiceClient.java

@@ -21,27 +21,58 @@ package io.milvus.client;
 
 
 import io.grpc.*;
 import io.grpc.*;
 import io.grpc.stub.MetadataUtils;
 import io.grpc.stub.MetadataUtils;
-import io.milvus.grpc.MilvusServiceGrpc;
+import io.milvus.grpc.*;
 import io.milvus.param.ConnectParam;
 import io.milvus.param.ConnectParam;
 
 
 import io.milvus.param.LogLevel;
 import io.milvus.param.LogLevel;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.alias.AlterAliasParam;
+import io.milvus.param.alias.CreateAliasParam;
+import io.milvus.param.alias.DropAliasParam;
+import io.milvus.param.bulkinsert.BulkInsertParam;
+import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
+import io.milvus.param.bulkinsert.ListBulkInsertTasksParam;
+import io.milvus.param.collection.*;
+import io.milvus.param.control.*;
+import io.milvus.param.credential.CreateCredentialParam;
+import io.milvus.param.credential.DeleteCredentialParam;
+import io.milvus.param.credential.ListCredUsersParam;
+import io.milvus.param.credential.UpdateCredentialParam;
+import io.milvus.param.dml.DeleteParam;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.QueryParam;
+import io.milvus.param.dml.SearchParam;
+import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
+import io.milvus.param.highlevel.collection.ListCollectionsParam;
+import io.milvus.param.highlevel.collection.response.ListCollectionsResponse;
+import io.milvus.param.highlevel.dml.*;
+import io.milvus.param.highlevel.dml.response.*;
+import io.milvus.param.index.*;
+import io.milvus.param.partition.*;
+import io.milvus.param.role.*;
 import lombok.NonNull;
 import lombok.NonNull;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
 
 
+import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
-import java.io.File;
+import java.util.concurrent.Callable;
 
 
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NettyChannelBuilder;
 import io.grpc.netty.NettyChannelBuilder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContext;
 
 
+
 public class MilvusServiceClient extends AbstractMilvusGrpcClient {
 public class MilvusServiceClient extends AbstractMilvusGrpcClient {
 
 
     private ManagedChannel channel;
     private ManagedChannel channel;
     private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
     private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
     private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
     private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
     private final long rpcDeadlineMs;
     private final long rpcDeadlineMs;
+    private long timeoutMs = 0;
+    private int retryTimes = 0;
+    private long retryIntervalMs = 500L;
 
 
     public MilvusServiceClient(@NonNull ConnectParam connectParam) {
     public MilvusServiceClient(@NonNull ConnectParam connectParam) {
         this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
         this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
@@ -120,6 +151,16 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
         futureStub = MilvusServiceGrpc.newFutureStub(channel);
         futureStub = MilvusServiceGrpc.newFutureStub(channel);
     }
     }
 
 
+    protected MilvusServiceClient(MilvusServiceClient src) {
+        this.channel = src.channel;
+        this.blockingStub = src.blockingStub;
+        this.futureStub = src.futureStub;
+        this.rpcDeadlineMs = src.rpcDeadlineMs;
+        this.timeoutMs = src.timeoutMs;
+        this.retryTimes = src.retryTimes;
+        this.retryIntervalMs = src.retryIntervalMs;
+    }
+
     // set log level in runtime
     // set log level in runtime
     public void setLogLevel(LogLevel level) {
     public void setLogLevel(LogLevel level) {
         logLevel = level;
         logLevel = level;
@@ -174,12 +215,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
         final MilvusServiceGrpc.MilvusServiceFutureStub futureStubTimeout =
         final MilvusServiceGrpc.MilvusServiceFutureStub futureStubTimeout =
                 this.futureStub.withInterceptors(timeoutInterceptor);
                 this.futureStub.withInterceptors(timeoutInterceptor);
 
 
-        return new AbstractMilvusGrpcClient() {
-            @Override
-            protected boolean clientIsReady() {
-                return MilvusServiceClient.this.clientIsReady();
-            }
-
+        MilvusServiceClient newClient = new MilvusServiceClient(this) {
             @Override
             @Override
             protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
             protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
                 return blockingStubTimeout;
                 return blockingStubTimeout;
@@ -189,17 +225,439 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
             protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
             protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
                 return futureStubTimeout;
                 return futureStubTimeout;
             }
             }
+        };
+        newClient.timeoutMs = timeoutMillis;
+        return newClient;
+    }
 
 
-            @Override
-            public void close(long maxWaitSeconds) throws InterruptedException {
-                MilvusServiceClient.this.close(maxWaitSeconds);
+    @Override
+    public MilvusClient withRetry(int retryTimes) {
+        if (retryTimes <= 0) {
+            return this;
+        }
+
+        MilvusServiceClient newClient = new MilvusServiceClient(this);
+        newClient.retryTimes = retryTimes;
+        return newClient;
+    }
+
+    @Override
+    public MilvusClient withRetryInterval(long interval, TimeUnit timeUnit) {
+        if (interval <= 0) {
+            return this;
+        }
+
+        MilvusServiceClient newClient = new MilvusServiceClient(this);
+        newClient.retryIntervalMs = timeUnit.toMillis(interval);
+        return newClient;
+    }
+
+    private <T> R<T> retry(Callable<R<T>> callable) {
+        // no retry, direct call the method
+        if (this.retryTimes <= 1) {
+            try {
+                return callable.call();
+            } catch (Exception e) {
+                return R.failed(e);
             }
             }
+        }
 
 
-            @Override
-            public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
-                return MilvusServiceClient.this.withTimeout(timeout, timeoutUnit);
+        // method to check timeout
+        long begin = System.currentTimeMillis();
+        Callable<Void> timeoutChecker = ()->{
+            long current = System.currentTimeMillis();
+            long cost = (current - begin);
+            if (this.timeoutMs > 0 && cost >= this.timeoutMs) {
+                String msg = String.format("Retry timeout: %dms", this.timeoutMs);
+                throw new RuntimeException(msg);
             }
             }
+            return null;
         };
         };
+
+        // retry within timeout
+        for (int i = 0; i < this.retryTimes; i++) {
+            try {
+                R<T> resp = callable.call();
+                if (resp.getStatus() == R.Status.Success.getCode()) {
+                    return resp;
+                }
+
+                if (i != this.retryTimes-1) {
+                    timeoutChecker.call();
+                    TimeUnit.MILLISECONDS.sleep(this.retryIntervalMs);
+                    timeoutChecker.call();
+                    logInfo(String.format("Retry again after %dms...", this.retryIntervalMs));
+                }
+            } catch (Exception e) {
+                logError(e.getMessage());
+                return R.failed(e);
+            }
+        }
+        String msg = String.format("Retry run out of %d retry times", this.retryTimes);
+        logError(msg);
+        return R.failed(new RuntimeException(msg));
+    }
+
+    @Override
+    public R<Boolean> hasCollection(HasCollectionParam requestParam) {
+        return retry(()-> super.hasCollection(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> createDatabase(CreateDatabaseParam requestParam) {
+        return retry(()-> super.createDatabase(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> dropDatabase(DropDatabaseParam requestParam) {
+        return retry(()-> super.dropDatabase(requestParam));
+    }
+
+    @Override
+    public R<ListDatabasesResponse> listDatabases() {
+        return retry(super::listDatabases);
+    }
+
+    @Override
+    public R<RpcStatus> createCollection(CreateCollectionParam requestParam) {
+        return retry(()-> super.createCollection(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> dropCollection(DropCollectionParam requestParam) {
+        return retry(()-> super.dropCollection(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> loadCollection(LoadCollectionParam requestParam) {
+        return retry(()-> super.loadCollection(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> releaseCollection(ReleaseCollectionParam requestParam) {
+        return retry(()-> super.releaseCollection(requestParam));
+    }
+
+    @Override
+    public R<DescribeCollectionResponse> describeCollection(DescribeCollectionParam requestParam) {
+        return retry(()-> super.describeCollection(requestParam));
+    }
+
+    @Override
+    public R<GetCollectionStatisticsResponse> getCollectionStatistics(GetCollectionStatisticsParam requestParam) {
+        return retry(()-> super.getCollectionStatistics(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> renameCollection(RenameCollectionParam requestParam) {
+        return retry(()-> super.renameCollection(requestParam));
+    }
+
+    @Override
+    public R<ShowCollectionsResponse> showCollections(ShowCollectionsParam requestParam) {
+        return retry(()-> super.showCollections(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> alterCollection(AlterCollectionParam requestParam) {
+        return retry(()-> super.alterCollection(requestParam));
+    }
+
+    @Override
+    public R<FlushResponse> flush(FlushParam requestParam) {
+        return retry(()-> super.flush(requestParam));
+    }
+
+    @Override
+    public R<FlushAllResponse> flushAll(boolean syncFlushAll, long syncFlushAllWaitingInterval, long syncFlushAllTimeout) {
+        return retry(()-> super.flushAll(syncFlushAll, syncFlushAllWaitingInterval, syncFlushAllTimeout));
+    }
+
+    @Override
+    public R<RpcStatus> createPartition(CreatePartitionParam requestParam) {
+        return retry(()-> super.createPartition(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> dropPartition(DropPartitionParam requestParam) {
+        return retry(()-> super.dropPartition(requestParam));
+    }
+
+    @Override
+    public R<Boolean> hasPartition(HasPartitionParam requestParam) {
+        return retry(()-> super.hasPartition(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> loadPartitions(LoadPartitionsParam requestParam) {
+        return retry(()-> super.loadPartitions(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> releasePartitions(ReleasePartitionsParam requestParam) {
+        return retry(()-> super.releasePartitions(requestParam));
+    }
+
+    @Override
+    public R<GetPartitionStatisticsResponse> getPartitionStatistics(GetPartitionStatisticsParam requestParam) {
+        return retry(()-> super.getPartitionStatistics(requestParam));
+    }
+
+    @Override
+    public R<ShowPartitionsResponse> showPartitions(ShowPartitionsParam requestParam) {
+        return retry(()-> super.showPartitions(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> createAlias(CreateAliasParam requestParam) {
+        return retry(()-> super.createAlias(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> dropAlias(DropAliasParam requestParam) {
+        return retry(()-> super.dropAlias(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> alterAlias(AlterAliasParam requestParam) {
+        return retry(()-> super.alterAlias(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> createIndex(CreateIndexParam requestParam) {
+        return retry(()-> super.createIndex(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> dropIndex(DropIndexParam requestParam) {
+        return retry(()-> super.dropIndex(requestParam));
+    }
+
+    @Override
+    public R<DescribeIndexResponse> describeIndex(DescribeIndexParam requestParam) {
+        return retry(()-> super.describeIndex(requestParam));
+    }
+
+    @Override
+    public R<MutationResult> insert(InsertParam requestParam) {
+        return retry(()-> super.insert(requestParam));
+    }
+
+    @Override
+    public R<MutationResult> delete(DeleteParam requestParam) {
+        return retry(()-> super.delete(requestParam));
+    }
+
+    @Override
+    public R<SearchResults> search(SearchParam requestParam) {
+        return retry(()-> super.search(requestParam));
+    }
+
+    @Override
+    public R<QueryResults> query(QueryParam requestParam) {
+        return retry(()-> super.query(requestParam));
+    }
+
+    @Override
+    public R<GetMetricsResponse> getMetrics(GetMetricsParam requestParam) {
+        return retry(()-> super.getMetrics(requestParam));
+    }
+
+    @Override
+    public R<GetFlushStateResponse> getFlushState(GetFlushStateParam requestParam) {
+        return retry(()-> super.getFlushState(requestParam));
+    }
+
+    @Override
+    public R<GetFlushAllStateResponse> getFlushAllState(GetFlushAllStateParam requestParam) {
+        return retry(()-> super.getFlushAllState(requestParam));
+    }
+
+    @Override
+    public R<GetPersistentSegmentInfoResponse> getPersistentSegmentInfo(GetPersistentSegmentInfoParam requestParam) {
+        return retry(()-> super.getPersistentSegmentInfo(requestParam));
+    }
+
+    @Override
+    public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoParam requestParam) {
+        return retry(()-> super.getQuerySegmentInfo(requestParam));
+    }
+
+    @Override
+    public R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam) {
+        return retry(()-> super.getReplicas(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
+        return retry(()-> super.loadBalance(requestParam));
+    }
+
+    @Override
+    public R<GetCompactionStateResponse> getCompactionState(GetCompactionStateParam requestParam) {
+        return retry(()-> super.getCompactionState(requestParam));
+    }
+
+    @Override
+    public R<ManualCompactionResponse> manualCompact(ManualCompactParam requestParam) {
+        return retry(()-> super.manualCompact(requestParam));
+    }
+
+    @Override
+    public R<GetCompactionPlansResponse> getCompactionStateWithPlans(GetCompactionPlansParam requestParam) {
+        return retry(()-> super.getCompactionStateWithPlans(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> createCredential(CreateCredentialParam requestParam) {
+        return retry(()-> super.createCredential(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> updateCredential(UpdateCredentialParam requestParam) {
+        return retry(()-> super.updateCredential(requestParam));
+    }
+
+    @Override
+    public R<RpcStatus> deleteCredential(DeleteCredentialParam requestParam) {
+        return retry(()-> super.deleteCredential(requestParam));
+    }
+
+    @Override
+    public R<ListCredUsersResponse> listCredUsers(ListCredUsersParam requestParam) {
+        return retry(()-> super.listCredUsers(requestParam));
+    }
+
+
+    @Override
+    public R<RpcStatus> createRole(CreateRoleParam requestParam) {
+        return retry(()-> super.createRole(requestParam));
+    }
+
+
+    @Override
+    public R<RpcStatus> dropRole(DropRoleParam requestParam) {
+        return retry(()-> super.dropRole(requestParam));
+    }
+
+
+    @Override
+    public R<RpcStatus> addUserToRole(AddUserToRoleParam requestParam) {
+        return retry(()-> super.addUserToRole(requestParam));
+    }
+
+
+    @Override
+    public R<RpcStatus> removeUserFromRole(RemoveUserFromRoleParam requestParam) {
+        return retry(()-> super.removeUserFromRole(requestParam));
+    }
+
+
+    @Override
+    public R<SelectRoleResponse> selectRole(SelectRoleParam requestParam) {
+        return retry(()-> super.selectRole(requestParam));
+    }
+
+
+    @Override
+    public R<SelectUserResponse> selectUser(SelectUserParam requestParam) {
+        return retry(()-> super.selectUser(requestParam));
+    }
+
+
+    @Override
+    public R<RpcStatus> grantRolePrivilege(GrantRolePrivilegeParam requestParam) {
+        return retry(()-> super.grantRolePrivilege(requestParam));
+    }
+
+
+    @Override
+    public R<RpcStatus> revokeRolePrivilege(RevokeRolePrivilegeParam requestParam) {
+        return retry(()-> super.revokeRolePrivilege(requestParam));
+    }
+
+
+    @Override
+    public R<SelectGrantResponse> selectGrantForRole(SelectGrantForRoleParam requestParam) {
+        return retry(()-> super.selectGrantForRole(requestParam));
+    }
+
+
+    @Override
+    public R<SelectGrantResponse> selectGrantForRoleAndObject(SelectGrantForRoleAndObjectParam requestParam) {
+        return retry(()-> super.selectGrantForRoleAndObject(requestParam));
+    }
+
+    @Override
+    public R<ImportResponse> bulkInsert(BulkInsertParam requestParam) {
+        return retry(()-> super.bulkInsert(requestParam));
+    }
+
+    @Override
+    public R<GetImportStateResponse> getBulkInsertState(GetBulkInsertStateParam requestParam) {
+        return retry(()-> super.getBulkInsertState(requestParam));
+    }
+
+    @Override
+    public R<ListImportTasksResponse> listBulkInsertTasks(ListBulkInsertTasksParam requestParam) {
+        return retry(()-> super.listBulkInsertTasks(requestParam));
+    }
+
+    @Override
+    public R<CheckHealthResponse> checkHealth() {
+        return retry(super::checkHealth);
+    }
+
+    @Override
+    public R<GetVersionResponse> getVersion() {
+        return retry(super::getVersion);
+    }
+
+    @Override
+    public R<GetLoadingProgressResponse> getLoadingProgress(GetLoadingProgressParam requestParam) {
+        return retry(()-> super.getLoadingProgress(requestParam));
+    }
+
+    @Override
+    public R<GetLoadStateResponse> getLoadState(GetLoadStateParam requestParam) {
+        return retry(()-> super.getLoadState(requestParam));
+    }
+
+
+
+    @Override
+    public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
+        return retry(()-> super.createCollection(requestParam));
+    }
+
+    @Override
+    public R<ListCollectionsResponse> listCollections(ListCollectionsParam requestParam) {
+        return retry(()-> super.listCollections(requestParam));
+    }
+
+    @Override
+    public R<InsertResponse> insert(InsertRowsParam requestParam) {
+        return retry(()-> super.insert(requestParam));
+    }
+
+    @Override
+    public R<DeleteResponse> delete(DeleteIdsParam requestParam) {
+        return retry(()-> super.delete(requestParam));
+    }
+
+    @Override
+    public R<GetResponse> get(GetIdsParam requestParam) {
+        return retry(()-> super.get(requestParam));
+    }
+
+    @Override
+    public R<QueryResponse> query(QuerySimpleParam requestParam) {
+        return retry(()-> super.query(requestParam));
+    }
+
+    @Override
+    public R<SearchResponse> search(SearchSimpleParam requestParam) {
+        return retry(()-> super.search(requestParam));
     }
     }
 }
 }