|
@@ -19,43 +19,46 @@
|
|
|
|
|
|
package io.milvus.client;
|
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import com.google.common.util.concurrent.FutureCallback;
|
|
|
import com.google.common.util.concurrent.Futures;
|
|
|
import com.google.common.util.concurrent.ListenableFuture;
|
|
|
import com.google.common.util.concurrent.MoreExecutors;
|
|
|
import io.grpc.StatusRuntimeException;
|
|
|
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
|
|
|
+import io.milvus.common.utils.JacksonUtils;
|
|
|
+import io.milvus.common.utils.VectorUtils;
|
|
|
import io.milvus.exception.*;
|
|
|
import io.milvus.grpc.*;
|
|
|
import io.milvus.grpc.ObjectEntity;
|
|
|
+import io.milvus.param.Constant;
|
|
|
import io.milvus.param.ParamUtils;
|
|
|
import io.milvus.param.R;
|
|
|
import io.milvus.param.RpcStatus;
|
|
|
import io.milvus.param.alias.*;
|
|
|
import io.milvus.param.bulkinsert.*;
|
|
|
import io.milvus.param.collection.*;
|
|
|
+import io.milvus.param.highlevel.collection.response.ListCollectionsResponse;
|
|
|
import io.milvus.param.control.*;
|
|
|
import io.milvus.param.credential.*;
|
|
|
import io.milvus.param.dml.*;
|
|
|
+import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
|
|
|
+import io.milvus.param.highlevel.collection.ListCollectionsParam;
|
|
|
+import io.milvus.param.highlevel.dml.*;
|
|
|
+import io.milvus.param.highlevel.dml.response.*;
|
|
|
import io.milvus.param.index.*;
|
|
|
import io.milvus.param.partition.*;
|
|
|
import io.milvus.param.role.*;
|
|
|
-import io.milvus.response.DescCollResponseWrapper;
|
|
|
+import io.milvus.response.*;
|
|
|
import lombok.NonNull;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
-import org.apache.commons.collections4.MapUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import javax.annotation.Nonnull;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Base64;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Objects;
|
|
|
-import java.util.Optional;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.Function;
|
|
|
|
|
@@ -2719,6 +2722,287 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ ///////////////////// High Level API//////////////////////
|
|
|
+ @Override
|
|
|
+ public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ // step1: create collection
|
|
|
+ R<RpcStatus> createCollectionStatus = createCollection(requestParam.getCreateCollectionParam());
|
|
|
+ if(!Objects.equals(createCollectionStatus.getStatus(), R.success().getStatus())){
|
|
|
+ logError("CreateCollection failed: {}", createCollectionStatus.getException().getMessage());
|
|
|
+ return R.failed(createCollectionStatus.getException());
|
|
|
+ }
|
|
|
+
|
|
|
+ // step2: create index
|
|
|
+ R<RpcStatus> createIndexStatus = createIndex(requestParam.getCreateIndexParam());
|
|
|
+ if(!Objects.equals(createIndexStatus.getStatus(), R.success().getStatus())){
|
|
|
+ logError("CreateIndex failed: {}", createIndexStatus.getException().getMessage());
|
|
|
+ return R.failed(createIndexStatus.getException());
|
|
|
+ }
|
|
|
+
|
|
|
+ // step3: load collection
|
|
|
+ R<RpcStatus> loadCollectionStatus = loadCollection(requestParam.getLoadCollectionParam());
|
|
|
+ if(!Objects.equals(loadCollectionStatus.getStatus(), R.success().getStatus())){
|
|
|
+ logError("LoadCollection failed: {}", loadCollectionStatus.getException().getMessage());
|
|
|
+ return R.failed(loadCollectionStatus.getException());
|
|
|
+ }
|
|
|
+
|
|
|
+ logDebug("CreateCollection successfully!");
|
|
|
+ return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("CreateCollection RPC failed!", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("CreateCollection failed! ", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<ListCollectionsResponse> listCollections(ListCollectionsParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ R<ShowCollectionsResponse> response = showCollections(requestParam.getShowCollectionsParam());
|
|
|
+ if(!Objects.equals(response.getStatus(), R.success().getStatus())){
|
|
|
+ logError("ListCollections failed: {}", response.getException().getMessage());
|
|
|
+ return R.failed(response.getException());
|
|
|
+ }
|
|
|
+
|
|
|
+ ShowCollResponseWrapper showCollResponseWrapper = new ShowCollResponseWrapper(response.getData());
|
|
|
+ return R.success(ListCollectionsResponse.builder()
|
|
|
+ .collectionNames(showCollResponseWrapper.getCollectionNames()).build());
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("ListCollections RPC failed!", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("ListCollections failed! ", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<InsertResponse> insert(InsertRowsParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ R<MutationResult> response = insert(requestParam.getInsertParam());
|
|
|
+ if(!Objects.equals(response.getStatus(), R.success().getStatus())){
|
|
|
+ logError("Insert failed: {}", response.getException().getMessage());
|
|
|
+ return R.failed(response.getException());
|
|
|
+ }
|
|
|
+
|
|
|
+ logDebug("Insert successfully!");
|
|
|
+ MutationResultWrapper wrapper = new MutationResultWrapper(response.getData());
|
|
|
+ return R.success(InsertResponse.builder().insertIds(wrapper.getInsertIDs()).insertCount(wrapper.getInsertCount()).build());
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("Insert RPC failed!", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("Insert failed! ", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<DeleteResponse> delete(DeleteIdsParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName());
|
|
|
+ R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
|
|
|
+ if (descResp.getStatus() != R.Status.Success.getCode()) {
|
|
|
+ logError("Failed to describe collection: {}", requestParam.getCollectionName());
|
|
|
+ return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
|
|
|
+ }
|
|
|
+ DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
|
|
|
+
|
|
|
+ String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper);
|
|
|
+ DeleteParam deleteParam = DeleteParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName())
|
|
|
+ .withExpr(expr)
|
|
|
+ .build();
|
|
|
+ R<MutationResult> resultR = delete(deleteParam);
|
|
|
+ MutationResultWrapper resultWrapper = new MutationResultWrapper(resultR.getData());
|
|
|
+ return R.success(DeleteResponse.builder().deleteIds(resultWrapper.getInsertIDs()).build());
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("Delete RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("Delete failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<GetResponse> get(GetIdsParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName());
|
|
|
+ R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
|
|
|
+
|
|
|
+ if (descResp.getStatus() != R.Status.Success.getCode()) {
|
|
|
+ logError("Failed to describe collection: {}", requestParam.getCollectionName());
|
|
|
+ return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
|
|
|
+ FieldType primaryField = wrapper.getPrimaryField();
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(requestParam.getOutputFields())) {
|
|
|
+ FieldType vectorField = wrapper.getVectorField();
|
|
|
+ requestParam.getOutputFields().addAll(Lists.newArrayList(Constant.ALL_OUTPUT_FIELDS, vectorField.getName()));
|
|
|
+ }
|
|
|
+
|
|
|
+ String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), primaryField.getName());
|
|
|
+ QueryParam queryParam = QueryParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName())
|
|
|
+ .withExpr(expr)
|
|
|
+ .withOutFields(requestParam.getOutputFields())
|
|
|
+ .withConsistencyLevel(ConsistencyLevelEnum.BOUNDED)
|
|
|
+ .build();
|
|
|
+ R<QueryResults> queryResp = query(queryParam);
|
|
|
+ QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryResp.getData());
|
|
|
+ return R.success(GetResponse.builder().rowRecords(queryResultsWrapper.getRowRecords()).build());
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("Get RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("Get failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<QueryResponse> query(QuerySimpleParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName());
|
|
|
+ R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
|
|
|
+ if (descResp.getStatus() != R.Status.Success.getCode()) {
|
|
|
+ logError("Failed to describe collection: {}", requestParam.getCollectionName());
|
|
|
+ return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ DescCollResponseWrapper descCollWrapper = new DescCollResponseWrapper(descResp.getData());
|
|
|
+ if (CollectionUtils.isEmpty(requestParam.getOutputFields())) {
|
|
|
+ FieldType vectorField = descCollWrapper.getVectorField();
|
|
|
+ requestParam.getOutputFields().addAll(Lists.newArrayList(Constant.ALL_OUTPUT_FIELDS, vectorField.getName()));
|
|
|
+ }
|
|
|
+
|
|
|
+ QueryParam queryParam = QueryParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName())
|
|
|
+ .withExpr(requestParam.getFilter())
|
|
|
+ .withOutFields(requestParam.getOutputFields())
|
|
|
+ .withOffset(requestParam.getOffset())
|
|
|
+ .withLimit(requestParam.getLimit())
|
|
|
+ .withConsistencyLevel(ConsistencyLevelEnum.BOUNDED)
|
|
|
+ .build();
|
|
|
+ R<QueryResults> response = query(queryParam);
|
|
|
+ if(!Objects.equals(response.getStatus(), R.success().getStatus())){
|
|
|
+ logError("Query failed: {}", response.getException().getMessage());
|
|
|
+ return R.failed(response.getException());
|
|
|
+ }
|
|
|
+
|
|
|
+ QueryResultsWrapper queryWrapper = new QueryResultsWrapper(response.getData());
|
|
|
+ return R.success(QueryResponse.builder().rowRecords(queryWrapper.getRowRecords()).build());
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("Query RPC failed!", e.getStatus().toString());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("Query failed! ", e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R<SearchResponse> search(SearchSimpleParam requestParam) {
|
|
|
+ if (!clientIsReady()) {
|
|
|
+ return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
|
|
|
+ }
|
|
|
+ logInfo(requestParam.toString());
|
|
|
+
|
|
|
+ try {
|
|
|
+ DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName());
|
|
|
+ R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
|
|
|
+
|
|
|
+ if (descResp.getStatus() != R.Status.Success.getCode()) {
|
|
|
+ logError("Failed to describe collection: {}", requestParam.getCollectionName());
|
|
|
+ return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
|
|
|
+ FieldType vectorField = wrapper.getVectorField();
|
|
|
+
|
|
|
+ // fill in vectorData
|
|
|
+ List<List<?>> vectors = new ArrayList<>();
|
|
|
+ if (requestParam.getVectors().get(0) instanceof List) {
|
|
|
+ vectors = (List<List<?>>) requestParam.getVectors();
|
|
|
+ } else {
|
|
|
+ vectors.add(requestParam.getVectors());
|
|
|
+ }
|
|
|
+
|
|
|
+ SearchParam searchParam = SearchParam.newBuilder()
|
|
|
+ .withCollectionName(requestParam.getCollectionName())
|
|
|
+ .withVectors(vectors)
|
|
|
+ .withVectorFieldName(vectorField.getName())
|
|
|
+ .withOutFields(requestParam.getOutputFields())
|
|
|
+ .withExpr(requestParam.getFilter())
|
|
|
+ .withTopK(requestParam.getLimit())
|
|
|
+ .withParams(JacksonUtils.toJsonString(requestParam.getParams()))
|
|
|
+ .withConsistencyLevel(ConsistencyLevelEnum.BOUNDED)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ // search
|
|
|
+ R<SearchResults> response = search(searchParam);
|
|
|
+ if(!Objects.equals(response.getStatus(), R.success().getStatus())){
|
|
|
+ logError("Search failed: {}", response.getException().getMessage());
|
|
|
+ return R.failed(response.getException());
|
|
|
+ }
|
|
|
+
|
|
|
+ SearchResultsWrapper searchResultsWrapper = new SearchResultsWrapper(response.getData().getResults());
|
|
|
+ return R.success(SearchResponse.builder().rowRecords(searchResultsWrapper.getRowRecords()).build());
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
+ logError("Search RPC failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logError("Search failed! Collection name:{}\n{}",
|
|
|
+ requestParam.getCollectionName(), e.getMessage());
|
|
|
+ return R.failed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
///////////////////// Log Functions//////////////////////
|
|
|
private void logDebug(String msg, Object... params) {
|
|
|
logger.debug(msg, params);
|