|
@@ -20,13 +20,13 @@
|
|
package io.milvus.client;
|
|
package io.milvus.client;
|
|
|
|
|
|
import io.grpc.*;
|
|
import io.grpc.*;
|
|
|
|
+import io.grpc.Status;
|
|
import io.grpc.stub.MetadataUtils;
|
|
import io.grpc.stub.MetadataUtils;
|
|
|
|
+import io.milvus.exception.MilvusException;
|
|
|
|
+import io.milvus.exception.ServerException;
|
|
import io.milvus.grpc.*;
|
|
import io.milvus.grpc.*;
|
|
-import io.milvus.param.ConnectParam;
|
|
|
|
|
|
+import io.milvus.param.*;
|
|
|
|
|
|
-import io.milvus.param.LogLevel;
|
|
|
|
-import io.milvus.param.R;
|
|
|
|
-import io.milvus.param.RpcStatus;
|
|
|
|
import io.milvus.param.alias.AlterAliasParam;
|
|
import io.milvus.param.alias.AlterAliasParam;
|
|
import io.milvus.param.alias.CreateAliasParam;
|
|
import io.milvus.param.alias.CreateAliasParam;
|
|
import io.milvus.param.alias.DropAliasParam;
|
|
import io.milvus.param.alias.DropAliasParam;
|
|
@@ -72,8 +72,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
|
|
private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
|
|
private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
|
|
private final long rpcDeadlineMs;
|
|
private final long rpcDeadlineMs;
|
|
private long timeoutMs = 0;
|
|
private long timeoutMs = 0;
|
|
- private int retryTimes = 0;
|
|
|
|
- private long retryIntervalMs = 500L;
|
|
|
|
|
|
+ private RetryParam retryParam = RetryParam.newBuilder().build();
|
|
|
|
|
|
public MilvusServiceClient(@NonNull ConnectParam connectParam) {
|
|
public MilvusServiceClient(@NonNull ConnectParam connectParam) {
|
|
this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
|
|
this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
|
|
@@ -158,9 +157,8 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
|
|
this.futureStub = src.futureStub;
|
|
this.futureStub = src.futureStub;
|
|
this.rpcDeadlineMs = src.rpcDeadlineMs;
|
|
this.rpcDeadlineMs = src.rpcDeadlineMs;
|
|
this.timeoutMs = src.timeoutMs;
|
|
this.timeoutMs = src.timeoutMs;
|
|
- this.retryTimes = src.retryTimes;
|
|
|
|
- this.retryIntervalMs = src.retryIntervalMs;
|
|
|
|
this.logLevel = src.logLevel;
|
|
this.logLevel = src.logLevel;
|
|
|
|
+ this.retryParam = src.retryParam;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -227,6 +225,13 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
|
|
return newClient;
|
|
return newClient;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public MilvusClient withRetry(RetryParam retryParam) {
|
|
|
|
+ MilvusServiceClient newClient = new MilvusServiceClient(this);
|
|
|
|
+ newClient.retryParam = retryParam;
|
|
|
|
+ return newClient;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public MilvusClient withRetry(int retryTimes) {
|
|
public MilvusClient withRetry(int retryTimes) {
|
|
if (retryTimes <= 0) {
|
|
if (retryTimes <= 0) {
|
|
@@ -234,7 +239,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
|
|
}
|
|
}
|
|
|
|
|
|
MilvusServiceClient newClient = new MilvusServiceClient(this);
|
|
MilvusServiceClient newClient = new MilvusServiceClient(this);
|
|
- newClient.retryTimes = retryTimes;
|
|
|
|
|
|
+ newClient.retryParam.setMaxRetryTimes(retryTimes);
|
|
return newClient;
|
|
return newClient;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -244,14 +249,17 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
|
|
return this;
|
|
return this;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // to compatible with the old behavior
|
|
MilvusServiceClient newClient = new MilvusServiceClient(this);
|
|
MilvusServiceClient newClient = new MilvusServiceClient(this);
|
|
- newClient.retryIntervalMs = timeUnit.toMillis(interval);
|
|
|
|
|
|
+ newClient.retryParam.setInitialBackOffMs(timeUnit.toMillis(interval));
|
|
|
|
+ newClient.retryParam.setMaxBackOffMs(timeUnit.toMillis(interval));
|
|
return newClient;
|
|
return newClient;
|
|
}
|
|
}
|
|
|
|
|
|
private <T> R<T> retry(Callable<R<T>> callable) {
|
|
private <T> R<T> retry(Callable<R<T>> callable) {
|
|
|
|
+ int maxRetryTimes = this.retryParam.getMaxRetryTimes();
|
|
// no retry, direct call the method
|
|
// no retry, direct call the method
|
|
- if (this.retryTimes <= 1) {
|
|
|
|
|
|
+ if (maxRetryTimes <= 1) {
|
|
try {
|
|
try {
|
|
return callable.call();
|
|
return callable.call();
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -261,36 +269,83 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
|
|
|
|
|
|
// method to check timeout
|
|
// method to check timeout
|
|
long begin = System.currentTimeMillis();
|
|
long begin = System.currentTimeMillis();
|
|
- Callable<Void> timeoutChecker = ()->{
|
|
|
|
|
|
+ Callable<Boolean> timeoutChecker = ()->{
|
|
long current = System.currentTimeMillis();
|
|
long current = System.currentTimeMillis();
|
|
long cost = (current - begin);
|
|
long cost = (current - begin);
|
|
if (this.timeoutMs > 0 && cost >= this.timeoutMs) {
|
|
if (this.timeoutMs > 0 && cost >= this.timeoutMs) {
|
|
- String msg = String.format("Retry timeout: %dms", this.timeoutMs);
|
|
|
|
- throw new RuntimeException(msg);
|
|
|
|
|
|
+ return Boolean.TRUE;
|
|
}
|
|
}
|
|
- return null;
|
|
|
|
|
|
+ return Boolean.FALSE;
|
|
};
|
|
};
|
|
|
|
|
|
// retry within timeout
|
|
// retry within timeout
|
|
- for (int i = 0; i < this.retryTimes; i++) {
|
|
|
|
|
|
+ long retryIntervalMs = this.retryParam.getInitialBackOffMs();
|
|
|
|
+ for (int k = 1; k <= maxRetryTimes; k++) {
|
|
try {
|
|
try {
|
|
R<T> resp = callable.call();
|
|
R<T> resp = callable.call();
|
|
if (resp.getStatus() == R.Status.Success.getCode()) {
|
|
if (resp.getStatus() == R.Status.Success.getCode()) {
|
|
return resp;
|
|
return resp;
|
|
}
|
|
}
|
|
|
|
|
|
- if (i != this.retryTimes-1) {
|
|
|
|
- timeoutChecker.call();
|
|
|
|
- TimeUnit.MILLISECONDS.sleep(this.retryIntervalMs);
|
|
|
|
- timeoutChecker.call();
|
|
|
|
- logInfo(String.format("Retry again after %dms...", this.retryIntervalMs));
|
|
|
|
|
|
+ Exception e = resp.getException();
|
|
|
|
+ if (e instanceof StatusRuntimeException) {
|
|
|
|
+ // for rpc exception, some error cannot be retried
|
|
|
|
+ StatusRuntimeException rpcException = (StatusRuntimeException)e;
|
|
|
|
+ Status.Code code = rpcException.getStatus().getCode();
|
|
|
|
+ if (code == Status.DEADLINE_EXCEEDED.getCode()
|
|
|
|
+ || code == Status.PERMISSION_DENIED.getCode()
|
|
|
|
+ || code == Status.UNAUTHENTICATED.getCode()
|
|
|
|
+ || code == Status.INVALID_ARGUMENT.getCode()
|
|
|
|
+ || code == Status.ALREADY_EXISTS.getCode()
|
|
|
|
+ || code == Status.RESOURCE_EXHAUSTED.getCode()) {
|
|
|
|
+ return resp;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (timeoutChecker.call() == Boolean.TRUE) {
|
|
|
|
+ String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s",
|
|
|
|
+ this.timeoutMs, maxRetryTimes, k, e.getMessage());
|
|
|
|
+ throw new MilvusException(msg, code.value());
|
|
|
|
+ }
|
|
|
|
+ } else if (e instanceof ServerException) {
|
|
|
|
+ ServerException serverException = (ServerException)e;
|
|
|
|
+ if (timeoutChecker.call() == Boolean.TRUE) {
|
|
|
|
+ String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s",
|
|
|
|
+ this.timeoutMs, maxRetryTimes, k, e.getMessage());
|
|
|
|
+ throw new MilvusException(msg, serverException.getStatus());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // for server-side returned error, only retry for rate limit
|
|
|
|
+ // in new error codes of v2.3, rate limit error value is 8
|
|
|
|
+ if (!(serverException.getCompatibleCode() == ErrorCode.RateLimit
|
|
|
|
+ || serverException.getStatus() == 8)) {
|
|
|
|
+ return resp;
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ return resp;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // print log, follow the pymilvus logic
|
|
|
|
+ if (k > 3) {
|
|
|
|
+ logWarning(String.format("Retry(%d) with interval %dms. Reason: %s",
|
|
|
|
+ k, retryIntervalMs, e.getMessage()));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // sleep for interval
|
|
|
|
+ if (k != maxRetryTimes) {
|
|
|
|
+ TimeUnit.MILLISECONDS.sleep(retryIntervalMs);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // reset the next interval value
|
|
|
|
+ retryIntervalMs = retryIntervalMs*this.retryParam.getBackOffMultiplier();
|
|
|
|
+ if (retryIntervalMs > this.retryParam.getMaxBackOffMs()) {
|
|
|
|
+ retryIntervalMs = this.retryParam.getMaxBackOffMs();
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
logError(e.getMessage());
|
|
logError(e.getMessage());
|
|
return R.failed(e);
|
|
return R.failed(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- String msg = String.format("Retry run out of %d retry times", this.retryTimes);
|
|
|
|
|
|
+ String msg = String.format("Finish %d retry times, stop retry", maxRetryTimes);
|
|
logError(msg);
|
|
logError(msg);
|
|
return R.failed(new RuntimeException(msg));
|
|
return R.failed(new RuntimeException(msg));
|
|
}
|
|
}
|