|
@@ -22,28 +22,32 @@ package io.milvus.client;
|
|
|
import com.google.protobuf.ByteString;
|
|
|
import io.grpc.StatusRuntimeException;
|
|
|
import io.milvus.exception.ClientNotConnectedException;
|
|
|
+import io.milvus.exception.IllegalResponseException;
|
|
|
import io.milvus.exception.ParamException;
|
|
|
import io.milvus.grpc.*;
|
|
|
import io.milvus.param.Constant;
|
|
|
-import io.milvus.param.ParamUtils;
|
|
|
import io.milvus.param.R;
|
|
|
import io.milvus.param.RpcStatus;
|
|
|
import io.milvus.param.alias.AlterAliasParam;
|
|
|
import io.milvus.param.alias.CreateAliasParam;
|
|
|
import io.milvus.param.alias.DropAliasParam;
|
|
|
import io.milvus.param.collection.*;
|
|
|
+import io.milvus.param.control.GetMetricsParam;
|
|
|
+import io.milvus.param.control.GetPersistentSegmentInfoParam;
|
|
|
+import io.milvus.param.control.GetQuerySegmentInfoParam;
|
|
|
import io.milvus.param.dml.*;
|
|
|
import io.milvus.param.index.*;
|
|
|
import io.milvus.param.partition.*;
|
|
|
+import lombok.NonNull;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.collections4.MapUtils;
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.lang.reflect.Field;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.nio.ByteOrder;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
public abstract class AbstractMilvusGrpcClient implements MilvusClient {
|
|
@@ -56,279 +60,21 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
|
|
|
|
|
|
protected abstract boolean clientIsReady();
|
|
|
|
|
|
- @Override
|
|
|
- public R<FlushResponse> flush(String collectionName, String dbName) {
|
|
|
- return flush(Collections.singletonList(collectionName), dbName);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<FlushResponse> flush(String collectionName) {
|
|
|
- return flush(Collections.singletonList(collectionName), "");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<FlushResponse> flush(List<String> collectionNames) {
|
|
|
- return flush(collectionNames, "");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<FlushResponse> flush(List<String> collectionNames, String dbName) {
|
|
|
- if (!clientIsReady()) {
|
|
|
- return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
- }
|
|
|
-
|
|
|
- for (String collectionName : collectionNames) {
|
|
|
- ParamUtils.CheckNullEmptyString(collectionName, "Collection name");
|
|
|
- }
|
|
|
-
|
|
|
- MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Flush).build();
|
|
|
- FlushRequest.Builder builder = FlushRequest.newBuilder().setBase(msgBase).setDbName(dbName);
|
|
|
- collectionNames.forEach(builder::addCollectionNames);
|
|
|
- FlushRequest flushRequest = builder.build();
|
|
|
- FlushResponse flush = null;
|
|
|
- try {
|
|
|
- flush = blockingStub().flush(flushRequest);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("[milvus] flush rpc request error:{}", e.getMessage());
|
|
|
- return R.failed(e);
|
|
|
- }
|
|
|
- return R.success(flush);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<MutationResult> delete(DeleteParam deleteParam) {
|
|
|
- if (!clientIsReady()) {
|
|
|
- return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
- }
|
|
|
-
|
|
|
- DeleteRequest deleteRequest = DeleteRequest.newBuilder()
|
|
|
- .setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build())
|
|
|
- .setCollectionName(deleteParam.getCollectionName())
|
|
|
- .setPartitionName(deleteParam.getPartitionName())
|
|
|
- .build();
|
|
|
-
|
|
|
- MutationResult delete = null;
|
|
|
- try {
|
|
|
- delete = blockingStub().delete(deleteRequest);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("[milvus] delete rpc request error:{}", e.getMessage());
|
|
|
- return R.failed(e);
|
|
|
- }
|
|
|
- return R.success(delete);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<MutationResult> insert(InsertParam insertParam) {
|
|
|
- if (!clientIsReady()) {
|
|
|
- return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
- }
|
|
|
-
|
|
|
- String collectionName = insertParam.getCollectionName();
|
|
|
- String partitionName = insertParam.getPartitionName();
|
|
|
- List<InsertParam.Field> fields = insertParam.getFields();
|
|
|
-
|
|
|
- //1. gen insert request
|
|
|
- MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
|
|
|
- InsertRequest.Builder insertBuilder = InsertRequest.newBuilder()
|
|
|
- .setCollectionName(collectionName)
|
|
|
- .setPartitionName(partitionName)
|
|
|
- .setBase(msgBase)
|
|
|
- .setNumRows(insertParam.getRowCount());
|
|
|
-
|
|
|
- //2. gen fieldData
|
|
|
- // TODO: check field type(use DescribeCollection get schema to compare)
|
|
|
- for (InsertParam.Field field : fields) {
|
|
|
- insertBuilder.addFieldsData(genFieldData(field.getName(), field.getType(), field.getValues()));
|
|
|
- }
|
|
|
-
|
|
|
- //3. call insert
|
|
|
- InsertRequest insertRequest = insertBuilder.build();
|
|
|
- MutationResult insert = null;
|
|
|
- try {
|
|
|
- insert = blockingStub().insert(insertRequest);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("[milvus] insert rpc request error:{}", e.getMessage());
|
|
|
- return R.failed(e);
|
|
|
- }
|
|
|
- return R.success(insert);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<SearchResults> search(SearchParam searchParam) {
|
|
|
- if (!clientIsReady()) {
|
|
|
- return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
- }
|
|
|
-
|
|
|
- SearchRequest.Builder builder = SearchRequest.newBuilder()
|
|
|
- .setDbName(searchParam.getDbName())
|
|
|
- .setCollectionName(searchParam.getCollectionName());
|
|
|
- if (!searchParam.getPartitionNames().isEmpty()) {
|
|
|
- searchParam.getPartitionNames().forEach(builder::addPartitionNames);
|
|
|
- }
|
|
|
-
|
|
|
- // prepare target vectors
|
|
|
- // TODO: check target vector dimension(use DescribeColection get schema to compare)
|
|
|
- List<?> vectors = searchParam.getVectors();
|
|
|
- List<ByteString> byteStrings = vectors.stream().map(vector -> {
|
|
|
- if (vector instanceof ByteBuffer) {
|
|
|
- return ByteString.copyFrom((ByteBuffer) vector);
|
|
|
- }
|
|
|
-
|
|
|
- if (vector instanceof List) {
|
|
|
- List list = (List) vector;
|
|
|
- if (list.get(0) instanceof Float) {
|
|
|
- ByteBuffer buf = ByteBuffer.allocate(Float.BYTES * list.size());
|
|
|
- list.forEach(v -> buf.putFloat((Float) v));
|
|
|
-
|
|
|
- byte[] array = buf.array();
|
|
|
- return ByteString.copyFrom(array);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- logger.error("Search target vector type is illegal(Only allow List<Float> or ByteBuffer)");
|
|
|
- return null;
|
|
|
- }).collect(Collectors.toList());
|
|
|
-
|
|
|
- PlaceholderValue.Builder pldBuilder = PlaceholderValue.newBuilder()
|
|
|
- .setTag(Constant.VECTOR_TAG)
|
|
|
- .setType(PlaceholderType.FloatVector);
|
|
|
- byteStrings.forEach(pldBuilder::addValues);
|
|
|
-
|
|
|
- PlaceholderGroup placeholderGroup = PlaceholderGroup.newBuilder()
|
|
|
- .addPlaceholders(pldBuilder.build())
|
|
|
- .build();
|
|
|
-
|
|
|
- builder.setPlaceholderGroup(placeholderGroup.toByteString());
|
|
|
-
|
|
|
- // search parameters
|
|
|
- builder.addSearchParams(
|
|
|
- KeyValuePair.newBuilder()
|
|
|
- .setKey(Constant.VECTOR_FIELD)
|
|
|
- .setValue(searchParam.getVectorFieldName())
|
|
|
- .build())
|
|
|
- .addSearchParams(
|
|
|
- KeyValuePair.newBuilder()
|
|
|
- .setKey(Constant.TOP_K)
|
|
|
- .setValue(String.valueOf(searchParam.getTopK()))
|
|
|
- .build())
|
|
|
- .addSearchParams(
|
|
|
- KeyValuePair.newBuilder()
|
|
|
- .setKey(Constant.METRIC_TYPE)
|
|
|
- .setValue(searchParam.getMetricType())
|
|
|
- .build());
|
|
|
-
|
|
|
- if (null != searchParam.getParams() && !searchParam.getParams().isEmpty()) {
|
|
|
- builder.addSearchParams(
|
|
|
- KeyValuePair.newBuilder()
|
|
|
- .setKey(Constant.PARAMS)
|
|
|
- .setValue(searchParam.getParams())
|
|
|
- .build());
|
|
|
- }
|
|
|
-
|
|
|
- if (!searchParam.getOutFields().isEmpty()) {
|
|
|
- searchParam.getOutFields().forEach(builder::addOutputFields);
|
|
|
- }
|
|
|
-
|
|
|
- // always use expression since dsl is discarded
|
|
|
- builder.setDslType(DslType.BoolExprV1);
|
|
|
- if (searchParam.getExpr() != null && !searchParam.getExpr().isEmpty()) {
|
|
|
- builder.setDsl(searchParam.getExpr());
|
|
|
- }
|
|
|
-
|
|
|
- SearchRequest searchRequest = builder.build();
|
|
|
- SearchResults search;
|
|
|
- try {
|
|
|
- search = this.blockingStub().search(searchRequest);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("[milvus] search rpc request error:{}", e.getMessage());
|
|
|
- return R.failed(e);
|
|
|
- }
|
|
|
-
|
|
|
- return R.success(search);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<QueryResults> query(QueryParam queryParam) {
|
|
|
- if (!clientIsReady()) {
|
|
|
- return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
- }
|
|
|
-
|
|
|
- QueryRequest queryRequest = QueryRequest.newBuilder()
|
|
|
- .setDbName(queryParam.getDbName())
|
|
|
- .setCollectionName(queryParam.getCollectionName())
|
|
|
- .addAllPartitionNames(queryParam.getPartitionNames())
|
|
|
- .addAllOutputFields(queryParam.getOutFields())
|
|
|
- .setExpr(queryParam.getExpr())
|
|
|
- .build();
|
|
|
-
|
|
|
- QueryResults query;
|
|
|
- try {
|
|
|
- query = this.blockingStub().query(queryRequest);
|
|
|
- } catch (Exception e) {
|
|
|
-// e.printStackTrace();
|
|
|
- logger.error("[milvus] query rpc request error:{}", e.getMessage());
|
|
|
- return R.failed(e);
|
|
|
- }
|
|
|
- return R.success(query);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public R<CalcDistanceResults> calcDistance(CalcDistanceParam calcDistanceParam) {
|
|
|
- if (!clientIsReady()) {
|
|
|
- return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
- }
|
|
|
-
|
|
|
- List<List<Float>> vectors_left = calcDistanceParam.getVectorsLeft();
|
|
|
- List<List<Float>> vectors_right = calcDistanceParam.getVectorsRight();
|
|
|
-
|
|
|
- FloatArray.Builder left_float_array = FloatArray.newBuilder();
|
|
|
- for (List<Float> vector : vectors_left) {
|
|
|
- left_float_array.addAllData(vector);
|
|
|
- }
|
|
|
-
|
|
|
- FloatArray.Builder right_float_array = FloatArray.newBuilder();
|
|
|
- for (List<Float> vector : vectors_right) {
|
|
|
- right_float_array.addAllData(vector);
|
|
|
- }
|
|
|
-
|
|
|
- CalcDistanceRequest calcDistanceRequest = CalcDistanceRequest.newBuilder()
|
|
|
- .setOpLeft(
|
|
|
- VectorsArray.newBuilder()
|
|
|
- .setDataArray(
|
|
|
- VectorField.newBuilder()
|
|
|
- .setFloatVector(left_float_array.build())
|
|
|
- .setDim(vectors_left.get(0).size())
|
|
|
- .build()
|
|
|
- )
|
|
|
- .build()
|
|
|
- )
|
|
|
- .setOpRight(
|
|
|
- VectorsArray.newBuilder()
|
|
|
- .setDataArray(
|
|
|
- VectorField.newBuilder()
|
|
|
- .setFloatVector(right_float_array.build())
|
|
|
- .setDim(vectors_right.get(0).size())
|
|
|
- .build()
|
|
|
- )
|
|
|
- .build()
|
|
|
- )
|
|
|
- .addParams(
|
|
|
- KeyValuePair.newBuilder()
|
|
|
- .setKey("metric")
|
|
|
- .setValue(calcDistanceParam.getMetricType())
|
|
|
- .build()
|
|
|
- )
|
|
|
- .build();
|
|
|
- CalcDistanceResults calcDistanceResults;
|
|
|
- try {
|
|
|
- calcDistanceResults = blockingStub().calcDistance(calcDistanceRequest);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("[milvus] calDistance rpc request error:{}", e.getMessage());
|
|
|
- return R.failed(e);
|
|
|
+ ///////////////////// Internal Functions//////////////////////
|
|
|
+ private List<KeyValuePair> assembleKvPair(Map<String, String> sourceMap) {
|
|
|
+ List<KeyValuePair> result = new ArrayList<>();
|
|
|
+ if (MapUtils.isNotEmpty(sourceMap)) {
|
|
|
+ sourceMap.forEach((key, value) -> {
|
|
|
+ KeyValuePair kv = KeyValuePair.newBuilder()
|
|
|
+ .setKey(key)
|
|
|
+ .setValue(value).build();
|
|
|
+ result.add(kv);
|
|
|
+ });
|
|
|
}
|
|
|
- return R.success(calcDistanceResults);
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
private FieldData genFieldData(String fieldName, DataType dataType, List<?> objects) {
|
|
|
if (objects == null) {
|
|
|
throw new ParamException("Cannot generate FieldData from null object");
|
|
@@ -340,7 +86,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
|
|
|
// each object is List<Float>
|
|
|
for (Object object : objects) {
|
|
|
if (object instanceof List) {
|
|
|
- List list = (List) object;
|
|
|
+ List<Float> list = (List<Float>) object;
|
|
|
floats.addAll(list);
|
|
|
} else {
|
|
|
throw new ParamException("The type of FloatVector must be List<Float>");
|
|
@@ -352,20 +98,21 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
|
|
|
VectorField vectorField = VectorField.newBuilder().setDim(dim).setFloatVector(floatArray).build();
|
|
|
return builder.setFieldName(fieldName).setType(DataType.FloatVector).setVectors(vectorField).build();
|
|
|
} else if (dataType == DataType.BinaryVector) {
|
|
|
- ByteString byteString = null;
|
|
|
+ ByteBuffer totalBuf = null;
|
|
|
int dim = 0;
|
|
|
// each object is ByteBuffer
|
|
|
for (Object object : objects) {
|
|
|
ByteBuffer buf = (ByteBuffer) object;
|
|
|
- ByteString tempStr = ByteString.copyFrom((ByteBuffer) buf);
|
|
|
- if (byteString == null){
|
|
|
- byteString = tempStr;
|
|
|
+ if (totalBuf == null){
|
|
|
+ totalBuf = ByteBuffer.allocate(buf.position() * objects.size());
|
|
|
+ totalBuf.put(buf.array());
|
|
|
dim = buf.position() * 8;
|
|
|
} else {
|
|
|
- byteString.concat(tempStr);
|
|
|
+ totalBuf.put(buf.array());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ ByteString byteString = ByteString.copyFrom(totalBuf.array());
|
|
|
VectorField vectorField = VectorField.newBuilder().setDim(dim).setBinaryVector(byteString).build();
|
|
|
return builder.setFieldName(fieldName).setType(DataType.BinaryVector).setVectors(vectorField).build();
|
|
|
}
|
|
@@ -417,704 +164,1476 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
|
|
|
add(DataType.BinaryVector);
|
|
|
}};
|
|
|
|
|
|
+ private void waitForLoadingCollection(String collectionName, List<String> partitionNames,
|
|
|
+ long waitingInterval, long timeout) throws IllegalResponseException {
|
|
|
+ long tsBegin = System.currentTimeMillis();
|
|
|
+ if (partitionNames == null || partitionNames.isEmpty()) {
|
|
|
+ ShowCollectionsRequest showCollectionRequest = ShowCollectionsRequest.newBuilder()
|
|
|
+ .addCollectionNames(collectionName)
|
|
|
+ .setType(ShowType.InMemory)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ // Use showCollection() to check loading percentages of the collection.
|
|
|
+ // If the inMemory percentage is 100, that means the collection has finished loading.
|
|
|
+ // Otherwise, this thread will sleep a small interval and check again.
|
|
|
+ // If waiting time exceed timeout, exist the circle
|
|
|
+ while (true) {
|
|
|
+ long tsNow = System.currentTimeMillis();
|
|
|
+ if ((tsNow - tsBegin) >= timeout*1000) {
|
|
|
+ logWarning("Waiting load thread is timeout, loading process may not be finished");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ ShowCollectionsResponse response = blockingStub().showCollections(showCollectionRequest);
|
|
|
+ int namesCount = response.getCollectionNamesCount();
|
|
|
+ int percentagesCount = response.getInMemoryPercentagesCount();
|
|
|
+ if (namesCount != 1) {
|
|
|
+ throw new IllegalResponseException("ShowCollectionsResponse is illegal. Collection count: "
|
|
|
+ + namesCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (namesCount != percentagesCount) {
|
|
|
+ String msg = "ShowCollectionsResponse is illegal. Collection count: " + namesCount
|
|
|
+ + " memory percentages count: " + percentagesCount;
|
|
|
+ throw new IllegalResponseException(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ long percentage = response.getInMemoryPercentages(0);
|
|
|
+ String responseCollection = response.getCollectionNames(0);
|
|
|
+ if (responseCollection.compareTo(collectionName) == 0 && percentage >= 100) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ logInfo("Waiting load, interval: {} ms, percentage: {}%", waitingInterval, percentage);
|
|
|
+ TimeUnit.MILLISECONDS.sleep(waitingInterval);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logWarning("Waiting load thread is interrupted, loading process may not be finished");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ ShowPartitionsRequest showPartitionsRequest = ShowPartitionsRequest.newBuilder()
|
|
|
+ .setCollectionName(collectionName)
|
|
|
+ .addAllPartitionNames(partitionNames)
|
|
|
+ .setType(ShowType.InMemory).build();
|
|
|
+
|
|
|
+ // Use showPartitions() to check loading percentages of all the partitions.
|
|
|
+ // If each partition's inMemory percentage is 100, that means all the partitions have finished loading.
|
|
|
+ // Otherwise, this thread will sleep a small interval and check again.
|
|
|
+ // If waiting time exceed timeout, exist the circle
|
|
|
+ while(true) {
|
|
|
+ long tsNow = System.currentTimeMillis();
|
|
|
+ if ((tsNow - tsBegin) >= timeout*1000) {
|
|
|
+ logWarning("Waiting load thread is timeout, loading process may not be finished");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ ShowPartitionsResponse response = blockingStub().showPartitions(showPartitionsRequest);
|
|
|
+ int namesCount = response.getPartitionNamesCount();
|
|
|
+ int percentagesCount = response.getInMemoryPercentagesCount();
|
|
|
+ if (namesCount != percentagesCount) {
|
|
|
+ String msg = "ShowPartitionsResponse is illegal. Partition count: " + namesCount
|
|
|
+ + " memory percentages count: " + percentagesCount;
|
|
|
+ throw new IllegalResponseException(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // construct a hash map to check each partition's inMemory percentage by name
|
|
|
+ Map<String, Long> percentages = new HashMap<>();
|
|
|
+ for (int i = 0; i < response.getInMemoryPercentagesCount(); ++i) {
|
|
|
+ percentages.put(response.getPartitionNames(i), response.getInMemoryPercentages(i));
|
|
|
+ }
|
|
|
+
|
|
|
+ String partitionNoMemState = "";
|
|
|
+ String partitionNotFullyLoad = "";
|
|
|
+ boolean allLoaded = true;
|
|
|
+ for (String name : partitionNames) {
|
|
|
+ if (!percentages.containsKey(name)) {
|
|
|
+ allLoaded = false;
|
|
|
+ partitionNoMemState = name;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (percentages.get(name) < 100L) {
|
|
|
+ allLoaded = false;
|
|
|
+ partitionNotFullyLoad = name;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (allLoaded) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ String msg = "Waiting load, interval: " + waitingInterval + "ms.";
|
|
|
+ if (!partitionNoMemState.isEmpty()) {
|
|
|
+ msg += ("Partition " + partitionNoMemState + " has no memory state.");
|
|
|
+ }
|
|
|
+ if (!partitionNotFullyLoad.isEmpty()) {
|
|
|
+ msg += ("Partition " + partitionNotFullyLoad + " has not fully loaded.");
|
|
|
+ }
|
|
|
+ logInfo(msg);
|
|
|
+ TimeUnit.MILLISECONDS.sleep(waitingInterval);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logWarning("Waiting load thread is interrupted, load process may not be finished");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitForFlush(FlushResponse flushResponse, long waitingInterval, long timeout) {
|
|
|
+ // The rpc api flush() return FlushResponse, but the returned segment ids maybe not yet persisted.
|
|
|
+ // This method use getPersistentSegmentInfo() to check segment state.
|
|
|
+ // If all segments state become Flushed, then we say the sync flush action is finished.
|
|
|
+ // If waiting time exceed timeout, exist the circle
|
|
|
+ long tsBegin = System.currentTimeMillis();
|
|
|
+ Map<String, LongArray> collectionSegIDs = flushResponse.getCollSegIDsMap();
|
|
|
+ collectionSegIDs.forEach((collectionName, segmentIDs) -> {
|
|
|
+ while (segmentIDs.getDataCount() > 0) {
|
|
|
+ long tsNow = System.currentTimeMillis();
|
|
|
+ if ((tsNow - tsBegin) >= timeout*1000) {
|
|
|
+ logWarning("Waiting flush thread is timeout, flush process may not be finished");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ GetPersistentSegmentInfoRequest getSegInfoRequest = GetPersistentSegmentInfoRequest.newBuilder()
|
|
|
+ .setCollectionName(collectionName)
|
|
|
+ .build();
|
|
|
+ GetPersistentSegmentInfoResponse response = blockingStub().getPersistentSegmentInfo(getSegInfoRequest);
|
|
|
+ List<PersistentSegmentInfo> segmentInfoArray = response.getInfosList();
|
|
|
+ int flushedCount = 0;
|
|
|
+ for (int i = 0; i < segmentIDs.getDataCount(); ++i) {
|
|
|
+ for (PersistentSegmentInfo info : segmentInfoArray) {
|
|
|
+ if (info.getSegmentID() == segmentIDs.getData(i) && info.getState() == SegmentState.Flushed) {
|
|
|
+ flushedCount++;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // if all segment of this collection has been flushed, break this circle and check next collection
|
|
|
+ if (flushedCount == segmentIDs.getDataCount()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ String msg = "Waiting flush, interval: " + waitingInterval + "ms. " + flushedCount +
|
|
|
+ " of " + segmentIDs.getDataCount() + " segments flushed.";
|
|
|
+ logInfo(msg);
|
|
|
+ TimeUnit.MILLISECONDS.sleep(waitingInterval);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logWarning("Waiting flush thread is interrupted, flush process may not be finished");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ ///////////////////// API implementation //////////////////////
|
|
|
@Override
|
|
|
- public R<Boolean> hasCollection(HasCollectionParam requestParam) {
|
|
|
+ public R<Boolean> hasCollection(@NonNull HasCollectionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- HasCollectionRequest hasCollectionRequest = HasCollectionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- BoolResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().hasCollection(hasCollectionRequest);
|
|
|
+ HasCollectionRequest hasCollectionRequest = HasCollectionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ BoolResponse response = blockingStub().hasCollection(hasCollectionRequest);
|
|
|
+
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Has collection check successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("HasCollectionRequest successfully!");
|
|
|
Boolean value = Optional.of(response)
|
|
|
.map(BoolResponse::getValue)
|
|
|
.orElse(false);
|
|
|
return R.success(value);
|
|
|
} else {
|
|
|
+ logError("HasCollectionRequest failed!\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logger.error("[milvus] hasCollection:{} request error: {}", requestParam.getCollectionName(), e.getMessage());
|
|
|
+ logError("HasCollectionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("HasCollectionRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
- public R<RpcStatus> createCollection(CreateCollectionParam requestParam) {
|
|
|
+ public R<RpcStatus> createCollection(@NonNull CreateCollectionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- // Check whether the parameters are correct or not
|
|
|
- if (requestParam == null) {
|
|
|
- return R.failed(new ParamException("Request param can not be null"));
|
|
|
- }
|
|
|
-
|
|
|
- // Construct CollectionSchema Params
|
|
|
- CollectionSchema.Builder collectionSchemaBuilder = CollectionSchema.newBuilder();
|
|
|
- collectionSchemaBuilder.setName(requestParam.getCollectionName())
|
|
|
- .setDescription(requestParam.getDescription());
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- for (FieldType fieldType : requestParam.getFieldTypes()) {
|
|
|
- FieldSchema.Builder fieldSchemaBuilder = FieldSchema.newBuilder()
|
|
|
- .setFieldID(fieldType.getFieldID())
|
|
|
- .setName(fieldType.getName())
|
|
|
- .setIsPrimaryKey(fieldType.isPrimaryKey())
|
|
|
- .setDescription(fieldType.getDescription())
|
|
|
- .setDataType(fieldType.getDataType())
|
|
|
- .setAutoID(fieldType.isAutoID());
|
|
|
+ try {
|
|
|
+ // Construct CollectionSchema Params
|
|
|
+ CollectionSchema.Builder collectionSchemaBuilder = CollectionSchema.newBuilder();
|
|
|
+ collectionSchemaBuilder.setName(requestParam.getCollectionName())
|
|
|
+ .setDescription(requestParam.getDescription());
|
|
|
+
|
|
|
+ long fieldID = 0;
|
|
|
+ for (FieldType fieldType : requestParam.getFieldTypes()) {
|
|
|
+ FieldSchema.Builder fieldSchemaBuilder = FieldSchema.newBuilder()
|
|
|
+ .setFieldID(fieldID)
|
|
|
+ .setName(fieldType.getName())
|
|
|
+ .setIsPrimaryKey(fieldType.isPrimaryKey())
|
|
|
+ .setDescription(fieldType.getDescription())
|
|
|
+ .setDataType(fieldType.getDataType())
|
|
|
+ .setAutoID(fieldType.isAutoID());
|
|
|
+
|
|
|
+ // assemble typeParams for CollectionSchema
|
|
|
+ List<KeyValuePair> typeParamsList = assembleKvPair(fieldType.getTypeParams());
|
|
|
+ if (CollectionUtils.isNotEmpty(typeParamsList)) {
|
|
|
+ typeParamsList.forEach(fieldSchemaBuilder::addTypeParams);
|
|
|
+ }
|
|
|
|
|
|
- // assemble typeParams for CollectionSchema
|
|
|
- List<KeyValuePair> typeParamsList = assembleKvPair(fieldType.getTypeParams());
|
|
|
- if (CollectionUtils.isNotEmpty(typeParamsList)) {
|
|
|
- typeParamsList.forEach(fieldSchemaBuilder::addTypeParams);
|
|
|
+ collectionSchemaBuilder.addFields(fieldSchemaBuilder.build());
|
|
|
+ fieldID++;
|
|
|
}
|
|
|
|
|
|
- collectionSchemaBuilder.addFields(fieldSchemaBuilder.build());
|
|
|
- }
|
|
|
-
|
|
|
- // Construct CreateCollectionRequest
|
|
|
- CreateCollectionRequest createCollectionRequest = CreateCollectionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setShardsNum(requestParam.getShardsNum())
|
|
|
- .setSchema(collectionSchemaBuilder.build().toByteString())
|
|
|
- .build();
|
|
|
+ // Construct CreateCollectionRequest
|
|
|
+ CreateCollectionRequest createCollectionRequest = CreateCollectionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setShardsNum(requestParam.getShardsNum())
|
|
|
+ .setSchema(collectionSchemaBuilder.build().toByteString())
|
|
|
+ .build();
|
|
|
|
|
|
- Status response;
|
|
|
- try {
|
|
|
- response = blockingStub().createCollection(createCollectionRequest);
|
|
|
+ Status response = blockingStub().createCollection(createCollectionRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Created collection " + requestParam.getCollectionName() + " successfully!\n{}",
|
|
|
- requestParam.toString());
|
|
|
+ logInfo("CreateCollectionRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
- logInfo("Created collection " + requestParam.getCollectionName() + " failed!\n{}", response);
|
|
|
+ logError("CreateCollectionRequest failed!\n{}", response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("createCollection " + requestParam.getCollectionName() + " RPC failed:\n{}",
|
|
|
- e.getStatus().toString());
|
|
|
+ logError("CreateCollectionRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("CreateCollectionRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> dropCollection(DropCollectionParam requestParam) {
|
|
|
+ public R<RpcStatus> dropCollection(@NonNull DropCollectionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- DropCollectionRequest dropCollectionRequest = DropCollectionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().dropCollection(dropCollectionRequest);
|
|
|
+ DropCollectionRequest dropCollectionRequest = DropCollectionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().dropCollection(dropCollectionRequest);
|
|
|
+
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Drop collection successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("DropCollectionRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("DropCollectionRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("dropCollectionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("DropCollectionRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("DropCollectionRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
- public R<RpcStatus> loadCollection(LoadCollectionParam requestParam) {
|
|
|
+ public R<RpcStatus> loadCollection(@NonNull LoadCollectionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().loadCollection(loadCollectionRequest);
|
|
|
+ LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
|
|
|
- if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Load collection successfully!\n{}", requestParam.toString());
|
|
|
- return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
- } else {
|
|
|
+ Status response = blockingStub().loadCollection(loadCollectionRequest);
|
|
|
+
|
|
|
+ if (response.getErrorCode() != ErrorCode.Success) {
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
- } catch (StatusRuntimeException e) {
|
|
|
- logError("loadCollectionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+
|
|
|
+ // sync load, wait until collection finish loading
|
|
|
+ if (requestParam.isSyncLoad()) {
|
|
|
+ waitForLoadingCollection(requestParam.getCollectionName(), null,
|
|
|
+ requestParam.getSyncLoadWaitingInterval(), requestParam.getSyncLoadWaitingTimeout());
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo("LoadCollectionRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
+ return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
+ } catch (StatusRuntimeException e) { // gRPC could throw this exception
|
|
|
+ logError("LoadCollectionRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (IllegalResponseException e) { // milvus exception for illegal response
|
|
|
+ logError("LoadCollectionRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("LoadCollectionRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> releaseCollection(ReleaseCollectionParam requestParam) {
|
|
|
+ public R<RpcStatus> releaseCollection(@NonNull ReleaseCollectionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- ReleaseCollectionRequest releaseCollectionRequest = ReleaseCollectionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().releaseCollection(releaseCollectionRequest);
|
|
|
+ ReleaseCollectionRequest releaseCollectionRequest = ReleaseCollectionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().releaseCollection(releaseCollectionRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Release collection successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("ReleaseCollectionRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("ReleaseCollectionRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("releaseCollectionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("ReleaseCollectionRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("ReleaseCollectionRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<DescribeCollectionResponse> describeCollection(DescribeCollectionParam requestParam) {
|
|
|
+ public R<DescribeCollectionResponse> describeCollection(@NonNull DescribeCollectionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- DescribeCollectionRequest describeCollectionRequest = DescribeCollectionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- DescribeCollectionResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().describeCollection(describeCollectionRequest);
|
|
|
+ DescribeCollectionRequest describeCollectionRequest = DescribeCollectionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ DescribeCollectionResponse response = blockingStub().describeCollection(describeCollectionRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Describe collection successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("DescribeCollectionRequest successfully!");
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("DescribeCollectionRequest failed!\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("describeCollectionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("DescribeCollectionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("DescribeCollectionRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<GetCollectionStatisticsResponse> getCollectionStatistics(GetCollectionStatisticsParam requestParam) {
|
|
|
+ public R<GetCollectionStatisticsResponse> getCollectionStatistics(@NonNull GetCollectionStatisticsParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- GetCollectionStatisticsRequest getCollectionStatisticsRequest = GetCollectionStatisticsRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- GetCollectionStatisticsResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().getCollectionStatistics(getCollectionStatisticsRequest);
|
|
|
+ // flush collection if client command to do it(some times user may want to know the newest row count)
|
|
|
+ if (requestParam.isFlushCollection()) {
|
|
|
+ R<FlushResponse> response = flush(FlushParam.newBuilder()
|
|
|
+ .addCollectionName(requestParam.getCollectionName())
|
|
|
+ .withSyncFlush(Boolean.TRUE)
|
|
|
+ .build());
|
|
|
+ if (response.getStatus() != R.Status.Success.getCode()) {
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus()), response.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ GetCollectionStatisticsRequest getCollectionStatisticsRequest = GetCollectionStatisticsRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ GetCollectionStatisticsResponse response = blockingStub().getCollectionStatistics(getCollectionStatisticsRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Get collection statistics successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("GetCollectionStatisticsRequest successfully!");
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("GetCollectionStatisticsRequest failed!\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("getCollectionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("GetCollectionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("GetCollectionStatisticsRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<ShowCollectionsResponse> showCollections(ShowCollectionsParam requestParam) {
|
|
|
+ public R<ShowCollectionsResponse> showCollections(@NonNull ShowCollectionsParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- ShowCollectionsRequest.Builder showCollectionRequestBuilder = ShowCollectionsRequest.newBuilder();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- String[] collectionNames = requestParam.getCollectionNames();
|
|
|
- if (collectionNames == null || collectionNames.length <= 0) {
|
|
|
- return R.failed(R.Status.ParamError, "Collection name not specified");
|
|
|
- }
|
|
|
-
|
|
|
- // add collectionNames
|
|
|
- Arrays.stream(collectionNames).forEach(showCollectionRequestBuilder::addCollectionNames);
|
|
|
-
|
|
|
- ShowCollectionsRequest showCollectionsRequest = showCollectionRequestBuilder
|
|
|
- .setType(requestParam.getShowType()).build();
|
|
|
-
|
|
|
- ShowCollectionsResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().showCollections(showCollectionsRequest);
|
|
|
+ ShowCollectionsRequest showCollectionsRequest = ShowCollectionsRequest.newBuilder()
|
|
|
+ .addAllCollectionNames(requestParam.getCollectionNames())
|
|
|
+ .setType(requestParam.getShowType()).build();
|
|
|
+
|
|
|
+ ShowCollectionsResponse response = blockingStub().showCollections(showCollectionsRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Show collection successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("ShowCollectionsRequest successfully!");
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("ShowCollectionsRequest failed!\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("showCollectionsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("ShowCollectionsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("ShowCollectionsRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Currently we don't allow client call this method since server side has no compaction function
|
|
|
+ * Now this method is only internally used by getCollectionStatistics()
|
|
|
+ */
|
|
|
+// @Override
|
|
|
+ private R<FlushResponse> flush(@NonNull FlushParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Flush).build();
|
|
|
+ FlushRequest flushRequest = FlushRequest.newBuilder()
|
|
|
+ .setBase(msgBase)
|
|
|
+ .addAllCollectionNames(requestParam.getCollectionNames())
|
|
|
+ .build();
|
|
|
+ FlushResponse response = blockingStub().flush(flushRequest);
|
|
|
+
|
|
|
+ if (requestParam.getSyncFlush() == Boolean.TRUE) {
|
|
|
+ waitForFlush(response, requestParam.getSyncFlushWaitingInterval(),
|
|
|
+ requestParam.getSyncFlushWaitingTimeout());
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo("FlushRequest successfully! Collection names:{}", requestParam.getCollectionNames());
|
|
|
+ return R.success(response);
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("FlushRequest RPC failed! Collection names:{}\n{}",
|
|
|
+ requestParam.getCollectionNames(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("FlushRequest failed! Collection names:{}\n{}",
|
|
|
+ requestParam.getCollectionNames(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> createPartition(CreatePartitionParam requestParam) {
|
|
|
+ public R<RpcStatus> createPartition(@NonNull CreatePartitionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- CreatePartitionRequest createPartitionRequest = CreatePartitionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setPartitionName(requestParam.getPartitionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().createPartition(createPartitionRequest);
|
|
|
+ CreatePartitionRequest createPartitionRequest = CreatePartitionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setPartitionName(requestParam.getPartitionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().createPartition(createPartitionRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Create partition successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("CreatePartitionRequest successfully! Collection name:{}, partition name:{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("CreatePartitionRequest failed! Collection name:{}, partition name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("createPartitionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("CreatePartitionRequest RPC failed! Collection name:{}, partition name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("CreatePartitionRequest failed! Collection name:{}, partition name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> dropPartition(DropPartitionParam requestParam) {
|
|
|
+ public R<RpcStatus> dropPartition(@NonNull DropPartitionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- DropPartitionRequest dropPartitionRequest = DropPartitionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setPartitionName(requestParam.getPartitionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().dropPartition(dropPartitionRequest);
|
|
|
+ DropPartitionRequest dropPartitionRequest = DropPartitionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setPartitionName(requestParam.getPartitionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().dropPartition(dropPartitionRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Drop partition successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("DropPartitionRequest successfully! Collection name:{}, partition name:{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("DropPartitionRequest failed! Collection name:{}, partition name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("createPartitionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("DropPartitionRequest RPC failed! Collection name:{}, partition name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("DropPartitionRequest failed! Collection name:{}, partition name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<Boolean> hasPartition(HasPartitionParam requestParam) {
|
|
|
+ public R<Boolean> hasPartition(@NonNull HasPartitionParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- HasPartitionRequest hasPartitionRequest = HasPartitionRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setPartitionName(requestParam.getPartitionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- BoolResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().hasPartition(hasPartitionRequest);
|
|
|
+ HasPartitionRequest hasPartitionRequest = HasPartitionRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setPartitionName(requestParam.getPartitionName())
|
|
|
+ .build();
|
|
|
|
|
|
- if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("HasPartition call successfully!\n{}", requestParam.toString());
|
|
|
-
|
|
|
- Boolean result = Optional.ofNullable(response)
|
|
|
- .map(BoolResponse::getValue)
|
|
|
- .orElse(false);
|
|
|
+ BoolResponse response = blockingStub().hasPartition(hasPartitionRequest);
|
|
|
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("HasPartitionRequest successfully!");
|
|
|
+ Boolean result = response.getValue();
|
|
|
return R.success(result);
|
|
|
} else {
|
|
|
+ logError("HasPartitionRequest failed!\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("hasPartitionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("HasPartitionRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("HasPartitionRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> loadPartitions(LoadPartitionsParam requestParam) {
|
|
|
+ public R<RpcStatus> loadPartitions(@NonNull LoadPartitionsParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- LoadPartitionsRequest.Builder loadPartitionsRequestBuilder = LoadPartitionsRequest.newBuilder();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- String[] partitionNames = requestParam.getPartitionNames();
|
|
|
- // add partitionNames
|
|
|
- Arrays.stream(partitionNames).forEach(loadPartitionsRequestBuilder::addPartitionNames);
|
|
|
-
|
|
|
- LoadPartitionsRequest loadPartitionsRequest = loadPartitionsRequestBuilder
|
|
|
- .setCollectionName(requestParam.getCollectionName()).build();
|
|
|
-
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().loadPartitions(loadPartitionsRequest);
|
|
|
+ LoadPartitionsRequest loadPartitionsRequest = LoadPartitionsRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .addAllPartitionNames(requestParam.getPartitionNames())
|
|
|
+ .build();
|
|
|
|
|
|
- if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Load partition successfully!\n{}", requestParam.toString());
|
|
|
- return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
- } else {
|
|
|
+ Status response = blockingStub().loadPartitions(loadPartitionsRequest);
|
|
|
+
|
|
|
+ if (response.getErrorCode() != ErrorCode.Success) {
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
- } catch (StatusRuntimeException e) {
|
|
|
- logError("loadPartitionsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+
|
|
|
+ // sync load, wait until all partitions finish loading
|
|
|
+ if (requestParam.isSyncLoad()) {
|
|
|
+ waitForLoadingCollection(requestParam.getCollectionName(), requestParam.getPartitionNames(),
|
|
|
+ requestParam.getSyncLoadWaitingInterval(), requestParam.getSyncLoadWaitingTimeout());
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo("LoadPartitionsRequest successfully! Collection name:{}, partition names:{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames());
|
|
|
+ return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
+ } catch (StatusRuntimeException e) { // gRPC could throw this exception
|
|
|
+ logError("LoadPartitionsRequest RPC failed! Collection name:{}, partition names:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (IllegalResponseException e) { // milvus exception for illegal response
|
|
|
+ logError("LoadPartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("LoadPartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> releasePartitions(ReleasePartitionsParam requestParam) {
|
|
|
+ public R<RpcStatus> releasePartitions(@NonNull ReleasePartitionsParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- ReleasePartitionsRequest.Builder releasePartitionsRequestBuilder = ReleasePartitionsRequest.newBuilder();
|
|
|
-
|
|
|
- String[] partitionNames = requestParam.getPartitionNames();
|
|
|
- // add partitionNames
|
|
|
- Arrays.stream(partitionNames).forEach(releasePartitionsRequestBuilder::addPartitionNames);
|
|
|
-
|
|
|
- ReleasePartitionsRequest releasePartitionsRequest = releasePartitionsRequestBuilder
|
|
|
- .setCollectionName(requestParam.getCollectionName()).build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().releasePartitions(releasePartitionsRequest);
|
|
|
+ ReleasePartitionsRequest releasePartitionsRequest = ReleasePartitionsRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .addAllPartitionNames(requestParam.getPartitionNames())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().releasePartitions(releasePartitionsRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Release partition successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("ReleasePartitionsRequest successfully! Collection name:{}, partition names:{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("ReleasePartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("releasePartitionsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("ReleasePartitionsRequest RPC failed! Collection name:{}, partition names:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("ReleasePartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<GetPartitionStatisticsResponse> getPartitionStatistics(GetPartitionStatisticsParam requestParam) {
|
|
|
+ public R<GetPartitionStatisticsResponse> getPartitionStatistics(@NonNull GetPartitionStatisticsParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- GetPartitionStatisticsRequest getPartitionStatisticsRequest = GetPartitionStatisticsRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setPartitionName(requestParam.getPartitionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- GetPartitionStatisticsResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().getPartitionStatistics(getPartitionStatisticsRequest);
|
|
|
+ GetPartitionStatisticsRequest getPartitionStatisticsRequest = GetPartitionStatisticsRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setPartitionName(requestParam.getPartitionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ GetPartitionStatisticsResponse response =
|
|
|
+ blockingStub().getPartitionStatistics(getPartitionStatisticsRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Get partition statistics successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("GetPartitionStatisticsRequest successfully!");
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("ReleasePartitionsRequest failed:\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("getPartitionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("GetPartitionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("GetQuerySegmentInfoRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<ShowPartitionsResponse> showPartitions(ShowPartitionsParam requestParam) {
|
|
|
+ public R<ShowPartitionsResponse> showPartitions(@NonNull ShowPartitionsParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- ShowPartitionsRequest showPartitionsRequest = ShowPartitionsRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- ShowPartitionsResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().showPartitions(showPartitionsRequest);
|
|
|
+ ShowPartitionsRequest showPartitionsRequest = ShowPartitionsRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .addAllPartitionNames(requestParam.getPartitionNames())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ ShowPartitionsResponse response = blockingStub().showPartitions(showPartitionsRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Show partition successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("ShowPartitionsRequest successfully!");
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("ShowPartitionsRequest failed:\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("showPartitionsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("ShowPartitionsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("ShowPartitionsRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> createAlias(CreateAliasParam requestParam) {
|
|
|
+ public R<RpcStatus> createAlias(@NonNull CreateAliasParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- CreateAliasRequest createAliasRequest = CreateAliasRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setAlias(requestParam.getAlias())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().createAlias(createAliasRequest);
|
|
|
+ CreateAliasRequest createAliasRequest = CreateAliasRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setAlias(requestParam.getAlias())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().createAlias(createAliasRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Create alias successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("CreateAliasRequest successfully! Collection name:{}, alias name:{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("CreateAliasRequest failed! Collection name:{}, alias name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("createAlias RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("CreateAliasRequest RPC failed! Collection name:{}, alias name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("CreateAliasRequest failed! Collection name:{}, alias name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> dropAlias(DropAliasParam requestParam) {
|
|
|
+ public R<RpcStatus> dropAlias(@NonNull DropAliasParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- DropAliasRequest dropAliasRequest = DropAliasRequest.newBuilder()
|
|
|
- .setAlias(requestParam.getAlias())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().dropAlias(dropAliasRequest);
|
|
|
+ DropAliasRequest dropAliasRequest = DropAliasRequest.newBuilder()
|
|
|
+ .setAlias(requestParam.getAlias())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().dropAlias(dropAliasRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Drop alias successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("DropAliasRequest successfully! Alias name:{}", requestParam.getAlias());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("DropAliasRequest failed! Alias name:{}\n{}",
|
|
|
+ requestParam.getAlias(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("dropAlias RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("DropAliasRequest RPC failed! Alias name:{}\n{}",
|
|
|
+ requestParam.getAlias(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("DropAliasRequest failed! Alias name:{}\n{}",
|
|
|
+ requestParam.getAlias(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> alterAlias(AlterAliasParam requestParam) {
|
|
|
+ public R<RpcStatus> alterAlias(@NonNull AlterAliasParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- AlterAliasRequest alterAliasRequest = AlterAliasRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setAlias(requestParam.getAlias())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().alterAlias(alterAliasRequest);
|
|
|
+ AlterAliasRequest alterAliasRequest = AlterAliasRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setAlias(requestParam.getAlias())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().alterAlias(alterAliasRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Alter alias successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("AlterAliasRequest successfully! Collection name:{}, alias name:{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("AlterAliasRequest failed! Collection name:{}, alias name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("alterAlias RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("AlterAliasRequest RPC failed! Collection name:{}, alias name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("AlterAliasRequest failed! Collection name:{}, alias name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), requestParam.getAlias(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> createIndex(CreateIndexParam requestParam) {
|
|
|
+ public R<RpcStatus> createIndex(@NonNull CreateIndexParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- CreateIndexRequest.Builder createIndexRequestBuilder = CreateIndexRequest.newBuilder();
|
|
|
- List<KeyValuePair> extraParamList = assembleKvPair(requestParam.getExtraParam());
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- if (CollectionUtils.isNotEmpty(extraParamList)) {
|
|
|
- extraParamList.forEach(createIndexRequestBuilder::addExtraParams);
|
|
|
- }
|
|
|
+ try {
|
|
|
+ CreateIndexRequest.Builder createIndexRequestBuilder = CreateIndexRequest.newBuilder();
|
|
|
+ List<KeyValuePair> extraParamList = assembleKvPair(requestParam.getExtraParam());
|
|
|
|
|
|
- CreateIndexRequest createIndexRequest = createIndexRequestBuilder.setCollectionName(requestParam.getCollectionName())
|
|
|
- .setFieldName(requestParam.getFieldName()).build();
|
|
|
+ if (CollectionUtils.isNotEmpty(extraParamList)) {
|
|
|
+ extraParamList.forEach(createIndexRequestBuilder::addExtraParams);
|
|
|
+ }
|
|
|
|
|
|
- Status response;
|
|
|
- try {
|
|
|
- response = blockingStub().createIndex(createIndexRequest);
|
|
|
+ CreateIndexRequest createIndexRequest = createIndexRequestBuilder.setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setFieldName(requestParam.getFieldName()).build();
|
|
|
+
|
|
|
+ Status response = blockingStub().createIndex(createIndexRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Create index successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("CreateIndexRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("CreateIndexRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("createIndex RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("CreateIndexRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("CreateIndexRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<RpcStatus> dropIndex(DropIndexParam requestParam) {
|
|
|
+ public R<RpcStatus> dropIndex(@NonNull DropIndexParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- DropIndexRequest dropIndexRequest = DropIndexRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setFieldName(requestParam.getFieldName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- Status response;
|
|
|
try {
|
|
|
- response = blockingStub().dropIndex(dropIndexRequest);
|
|
|
+ DropIndexRequest dropIndexRequest = DropIndexRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setFieldName(requestParam.getFieldName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Status response = blockingStub().dropIndex(dropIndexRequest);
|
|
|
|
|
|
if (response.getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Drop index successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("DropIndexRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
} else {
|
|
|
+ logError("DropIndexRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), response.getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("dropIndex RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("DropIndexRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("DropIndexRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<DescribeIndexResponse> describeIndex(DescribeIndexParam requestParam) {
|
|
|
+ public R<DescribeIndexResponse> describeIndex(@NonNull DescribeIndexParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- DescribeIndexRequest describeIndexRequest = DescribeIndexRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setFieldName(requestParam.getFieldName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- DescribeIndexResponse response;
|
|
|
try {
|
|
|
- response = blockingStub().describeIndex(describeIndexRequest);
|
|
|
+ DescribeIndexRequest describeIndexRequest = DescribeIndexRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setFieldName(requestParam.getFieldName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ DescribeIndexResponse response = blockingStub().describeIndex(describeIndexRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Describe index successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("DescribeIndexRequest successfully!");
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("DescribeIndexRequest failed:\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("describeIndex RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("DescribeIndexRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("DescribeIndexRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<GetIndexStateResponse> getIndexState(GetIndexStateParam requestParam) {
|
|
|
+ public R<GetIndexStateResponse> getIndexState(@NonNull GetIndexStateParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- GetIndexStateRequest getIndexStateRequest = GetIndexStateRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .setFieldName(requestParam.getFieldName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ GetIndexStateRequest getIndexStateRequest = GetIndexStateRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setFieldName(requestParam.getFieldName())
|
|
|
+ .build();
|
|
|
|
|
|
- GetIndexStateResponse response;
|
|
|
+ GetIndexStateResponse response = blockingStub().getIndexState(getIndexStateRequest);
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("GetIndexStateRequest successfully!");
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("GetIndexStateRequest failed:\n{}", response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("GetIndexStateRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("GetIndexStateRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<GetIndexBuildProgressResponse> getIndexBuildProgress(@NonNull GetIndexBuildProgressParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
try {
|
|
|
- response = blockingStub().getIndexState(getIndexStateRequest);
|
|
|
+ GetIndexBuildProgressRequest getIndexBuildProgressRequest = GetIndexBuildProgressRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ GetIndexBuildProgressResponse response = blockingStub().getIndexBuildProgress(getIndexBuildProgressRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Get index state successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("GetIndexBuildProgressRequest successfully!");
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("GetIndexBuildProgressRequest failed:\n{}", response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("getIndexState RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("GetIndexBuildProgressRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("GetIndexBuildProgressRequest failed:\n{}", e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public R<GetIndexBuildProgressResponse> getIndexBuildProgress(GetIndexBuildProgressParam requestParam) {
|
|
|
+ public R<MutationResult> delete(@NonNull DeleteParam requestParam) {
|
|
|
if (!clientIsReady()) {
|
|
|
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
}
|
|
|
|
|
|
- GetIndexBuildProgressRequest getIndexBuildProgressRequest = GetIndexBuildProgressRequest.newBuilder()
|
|
|
- .setCollectionName(requestParam.getCollectionName())
|
|
|
- .build();
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
- GetIndexBuildProgressResponse response;
|
|
|
+ try {
|
|
|
+ DeleteRequest deleteRequest = DeleteRequest.newBuilder()
|
|
|
+ .setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build())
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .setPartitionName(requestParam.getPartitionName())
|
|
|
+ .setExpr(requestParam.getExpr())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ MutationResult response = blockingStub().delete(deleteRequest);
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("DeleteRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("DeleteRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("DeleteRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("DeleteRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<MutationResult> insert(@NonNull InsertParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
|
|
|
try {
|
|
|
- response = blockingStub().getIndexBuildProgress(getIndexBuildProgressRequest);
|
|
|
+ String collectionName = requestParam.getCollectionName();
|
|
|
+ String partitionName = requestParam.getPartitionName();
|
|
|
+ List<InsertParam.Field> fields = requestParam.getFields();
|
|
|
+
|
|
|
+ //1. gen insert request
|
|
|
+ MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
|
|
|
+ InsertRequest.Builder insertBuilder = InsertRequest.newBuilder()
|
|
|
+ .setCollectionName(collectionName)
|
|
|
+ .setPartitionName(partitionName)
|
|
|
+ .setBase(msgBase)
|
|
|
+ .setNumRows(requestParam.getRowCount());
|
|
|
+
|
|
|
+ //2. gen fieldData
|
|
|
+ // TODO: check field type(use DescribeCollection get schema to compare)
|
|
|
+ for (InsertParam.Field field : fields) {
|
|
|
+ insertBuilder.addFieldsData(genFieldData(field.getName(), field.getType(), field.getValues()));
|
|
|
+ }
|
|
|
+
|
|
|
+ //3. call insert
|
|
|
+ InsertRequest insertRequest = insertBuilder.build();
|
|
|
+ MutationResult response = blockingStub().insert(insertRequest);
|
|
|
|
|
|
if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
- logInfo("Get index build progress successfully!\n{}", requestParam.toString());
|
|
|
+ logInfo("InsertRequest successfully! Collection name:{}",
|
|
|
+ requestParam.getCollectionName());
|
|
|
return R.success(response);
|
|
|
} else {
|
|
|
+ logError("InsertRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), response.getStatus().getReason());
|
|
|
return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
response.getStatus().getReason());
|
|
|
}
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
- logError("getIndexBuildProgress RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ logError("InsertRequest RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("InsertRequest failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
return R.failed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private List<KeyValuePair> assembleKvPair(Map<String, String> sourceMap) {
|
|
|
- List<KeyValuePair> result = new ArrayList<>();
|
|
|
- if (MapUtils.isNotEmpty(sourceMap)) {
|
|
|
- sourceMap.forEach((key, value) -> {
|
|
|
- KeyValuePair kv = KeyValuePair.newBuilder()
|
|
|
- .setKey(key)
|
|
|
- .setValue(value).build();
|
|
|
- result.add(kv);
|
|
|
- });
|
|
|
+ @Override
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public R<SearchResults> search(@NonNull SearchParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ SearchRequest.Builder builder = SearchRequest.newBuilder()
|
|
|
+ .setDbName("")
|
|
|
+ .setCollectionName(requestParam.getCollectionName());
|
|
|
+ if (!requestParam.getPartitionNames().isEmpty()) {
|
|
|
+ requestParam.getPartitionNames().forEach(builder::addPartitionNames);
|
|
|
+ }
|
|
|
+
|
|
|
+ // prepare target vectors
|
|
|
+ // TODO: check target vector dimension(use DescribeCollection get schema to compare)
|
|
|
+ PlaceholderType plType = PlaceholderType.None;
|
|
|
+ List<?> vectors = requestParam.getVectors();
|
|
|
+ List<ByteString> byteStrings = new ArrayList<>();
|
|
|
+ for (Object vector : vectors) {
|
|
|
+ if (vector instanceof List) {
|
|
|
+ plType = PlaceholderType.FloatVector;
|
|
|
+ List<Float> list = (List<Float>) vector;
|
|
|
+ ByteBuffer buf = ByteBuffer.allocate(Float.BYTES * list.size());
|
|
|
+ buf.order(ByteOrder.LITTLE_ENDIAN);
|
|
|
+ list.forEach(buf::putFloat);
|
|
|
+
|
|
|
+ byte[] array = buf.array();
|
|
|
+ ByteString bs = ByteString.copyFrom(array);
|
|
|
+ byteStrings.add(bs);
|
|
|
+ } else if (vector instanceof ByteBuffer) {
|
|
|
+ plType = PlaceholderType.BinaryVector;
|
|
|
+ ByteBuffer buf = (ByteBuffer) vector;
|
|
|
+ byte[] array = buf.array();
|
|
|
+ ByteString bs = ByteString.copyFrom(array);
|
|
|
+ byteStrings.add(bs);
|
|
|
+ } else {
|
|
|
+ String msg = "Search target vector type is illegal(Only allow List<Float> or ByteBuffer)";
|
|
|
+ logError(msg);
|
|
|
+ return R.failed(R.Status.UnexpectedError, msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ PlaceholderValue.Builder pldBuilder = PlaceholderValue.newBuilder()
|
|
|
+ .setTag(Constant.VECTOR_TAG)
|
|
|
+ .setType(plType);
|
|
|
+ byteStrings.forEach(pldBuilder::addValues);
|
|
|
+
|
|
|
+ PlaceholderValue plv = pldBuilder.build();
|
|
|
+ PlaceholderGroup placeholderGroup = PlaceholderGroup.newBuilder()
|
|
|
+ .addPlaceholders(plv)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ ByteString byteStr = placeholderGroup.toByteString();
|
|
|
+ builder.setPlaceholderGroup(byteStr);
|
|
|
+
|
|
|
+ // search parameters
|
|
|
+ builder.addSearchParams(
|
|
|
+ KeyValuePair.newBuilder()
|
|
|
+ .setKey(Constant.VECTOR_FIELD)
|
|
|
+ .setValue(requestParam.getVectorFieldName())
|
|
|
+ .build())
|
|
|
+ .addSearchParams(
|
|
|
+ KeyValuePair.newBuilder()
|
|
|
+ .setKey(Constant.TOP_K)
|
|
|
+ .setValue(String.valueOf(requestParam.getTopK()))
|
|
|
+ .build())
|
|
|
+ .addSearchParams(
|
|
|
+ KeyValuePair.newBuilder()
|
|
|
+ .setKey(Constant.METRIC_TYPE)
|
|
|
+ .setValue(requestParam.getMetricType())
|
|
|
+ .build())
|
|
|
+ .addSearchParams(
|
|
|
+ KeyValuePair.newBuilder()
|
|
|
+ .setKey(Constant.ROUND_DECIMAL)
|
|
|
+ .setValue(String.valueOf(requestParam.getRoundDecimal()))
|
|
|
+ .build());
|
|
|
+
|
|
|
+ if (null != requestParam.getParams() && !requestParam.getParams().isEmpty()) {
|
|
|
+ builder.addSearchParams(
|
|
|
+ KeyValuePair.newBuilder()
|
|
|
+ .setKey(Constant.PARAMS)
|
|
|
+ .setValue(requestParam.getParams())
|
|
|
+ .build());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!requestParam.getOutFields().isEmpty()) {
|
|
|
+ requestParam.getOutFields().forEach(builder::addOutputFields);
|
|
|
+ }
|
|
|
+
|
|
|
+ // always use expression since dsl is discarded
|
|
|
+ builder.setDslType(DslType.BoolExprV1);
|
|
|
+ if (requestParam.getExpr() != null && !requestParam.getExpr().isEmpty()) {
|
|
|
+ builder.setDsl(requestParam.getExpr());
|
|
|
+ }
|
|
|
+
|
|
|
+ SearchRequest searchRequest = builder.build();
|
|
|
+ SearchResults response = this.blockingStub().search(searchRequest);
|
|
|
+
|
|
|
+ //TODO: truncate distance value by round decimal
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("SearchRequest successfully!");
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("SearchRequest failed:\n{}", response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("SearchRequest RPC failed:{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("SearchRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<QueryResults> query(@NonNull QueryParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ QueryRequest queryRequest = QueryRequest.newBuilder()
|
|
|
+ .setDbName("")
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .addAllPartitionNames(requestParam.getPartitionNames())
|
|
|
+ .addAllOutputFields(requestParam.getOutFields())
|
|
|
+ .setExpr(requestParam.getExpr())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ QueryResults response = this.blockingStub().query(queryRequest);
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("QueryRequest successfully!");
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("QueryRequest failed:\n{}", response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+// e.printStackTrace();
|
|
|
+ logError("QueryRequest RPC failed:{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("QueryRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<CalcDistanceResults> calcDistance(@NonNull CalcDistanceParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ List<List<Float>> vectors_left = requestParam.getVectorsLeft();
|
|
|
+ List<List<Float>> vectors_right = requestParam.getVectorsRight();
|
|
|
+
|
|
|
+ FloatArray.Builder left_float_array = FloatArray.newBuilder();
|
|
|
+ for (List<Float> vector : vectors_left) {
|
|
|
+ left_float_array.addAllData(vector);
|
|
|
+ }
|
|
|
+
|
|
|
+ FloatArray.Builder right_float_array = FloatArray.newBuilder();
|
|
|
+ for (List<Float> vector : vectors_right) {
|
|
|
+ right_float_array.addAllData(vector);
|
|
|
+ }
|
|
|
+
|
|
|
+ CalcDistanceRequest calcDistanceRequest = CalcDistanceRequest.newBuilder()
|
|
|
+ .setOpLeft(
|
|
|
+ VectorsArray.newBuilder()
|
|
|
+ .setDataArray(
|
|
|
+ VectorField.newBuilder()
|
|
|
+ .setFloatVector(left_float_array.build())
|
|
|
+ .setDim(vectors_left.get(0).size())
|
|
|
+ .build()
|
|
|
+ )
|
|
|
+ .build()
|
|
|
+ )
|
|
|
+ .setOpRight(
|
|
|
+ VectorsArray.newBuilder()
|
|
|
+ .setDataArray(
|
|
|
+ VectorField.newBuilder()
|
|
|
+ .setFloatVector(right_float_array.build())
|
|
|
+ .setDim(vectors_right.get(0).size())
|
|
|
+ .build()
|
|
|
+ )
|
|
|
+ .build()
|
|
|
+ )
|
|
|
+ .addParams(
|
|
|
+ KeyValuePair.newBuilder()
|
|
|
+ .setKey("metric")
|
|
|
+ .setValue(requestParam.getMetricType())
|
|
|
+ .build()
|
|
|
+ )
|
|
|
+ .build();
|
|
|
+
|
|
|
+ CalcDistanceResults response = blockingStub().calcDistance(calcDistanceRequest);
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("CalcDistanceRequest successfully!");
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("CalcDistanceRequest failed:\n{}", response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("CalcDistanceRequest RPC failed:{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("CalcDistanceRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<GetMetricsResponse> getMetrics(@NonNull GetMetricsParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ GetMetricsRequest getMetricsRequest = GetMetricsRequest.newBuilder()
|
|
|
+ .setRequest(requestParam.getRequest())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ GetMetricsResponse response = blockingStub().getMetrics(getMetricsRequest);
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("GetMetricsRequest successfully!");
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("GetMetricsRequest failed:\n{}", response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("GetMetricsRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("GetMetricsRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<GetPersistentSegmentInfoResponse> getPersistentSegmentInfo(@NonNull GetPersistentSegmentInfoParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ GetPersistentSegmentInfoRequest getSegmentInfoRequest = GetPersistentSegmentInfoRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ GetPersistentSegmentInfoResponse response = blockingStub().getPersistentSegmentInfo(getSegmentInfoRequest);
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("GetPersistentSegmentInfoRequest successfully!");
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("GetPersistentSegmentInfoRequest failed:\n{}", response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("GetPersistentSegmentInfoRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("GetPersistentSegmentInfoRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(@NonNull GetQuerySegmentInfoParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ GetQuerySegmentInfoRequest getSegmentInfoRequest = GetQuerySegmentInfoRequest.newBuilder()
|
|
|
+ .setCollectionName(requestParam.getCollectionName())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ GetQuerySegmentInfoResponse response = blockingStub().getQuerySegmentInfo(getSegmentInfoRequest);
|
|
|
+
|
|
|
+ if (response.getStatus().getErrorCode() == ErrorCode.Success) {
|
|
|
+ logInfo("GetQuerySegmentInfoRequest successfully!");
|
|
|
+ return R.success(response);
|
|
|
+ } else {
|
|
|
+ logError("GetQuerySegmentInfoRequest failed:\n{}", response.getStatus().getReason());
|
|
|
+ return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
|
|
|
+ response.getStatus().getReason());
|
|
|
+ }
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("GetQuerySegmentInfoRequest RPC failed:\n{}", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("GetQuerySegmentInfoRequest failed:\n{}", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
}
|
|
|
- return result;
|
|
|
}
|
|
|
|
|
|
///////////////////// Log Functions//////////////////////
|