|
@@ -30,9 +30,6 @@ import io.milvus.v2.service.collection.response.DescribeCollectionResp;
|
|
|
import io.milvus.v2.service.vector.request.DeleteReq;
|
|
|
import io.milvus.v2.service.vector.request.InsertReq;
|
|
|
import io.milvus.v2.service.vector.request.UpsertReq;
|
|
|
-import lombok.Builder;
|
|
|
-import lombok.Getter;
|
|
|
-import lombok.NonNull;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
|
import java.util.*;
|
|
@@ -43,8 +40,7 @@ public class DataUtils {
|
|
|
private InsertRequest.Builder insertBuilder;
|
|
|
private UpsertRequest.Builder upsertBuilder;
|
|
|
|
|
|
- public InsertRequest convertGrpcInsertRequest(@NonNull InsertReq requestParam,
|
|
|
- DescribeCollectionResp descColl) {
|
|
|
+ public InsertRequest convertGrpcInsertRequest(InsertReq requestParam, DescribeCollectionResp descColl) {
|
|
|
String dbName = requestParam.getDatabaseName();
|
|
|
String collectionName = requestParam.getCollectionName();
|
|
|
|
|
@@ -62,8 +58,7 @@ public class DataUtils {
|
|
|
return insertBuilder.build();
|
|
|
}
|
|
|
|
|
|
- public UpsertRequest convertGrpcUpsertRequest(@NonNull UpsertReq requestParam,
|
|
|
- DescribeCollectionResp descColl) {
|
|
|
+ public UpsertRequest convertGrpcUpsertRequest(UpsertReq requestParam, DescribeCollectionResp descColl) {
|
|
|
String dbName = requestParam.getDatabaseName();
|
|
|
String collectionName = requestParam.getCollectionName();
|
|
|
|
|
@@ -82,7 +77,7 @@ public class DataUtils {
|
|
|
return upsertBuilder.build();
|
|
|
}
|
|
|
|
|
|
- private void addFieldsData(io.milvus.grpc.FieldData value) {
|
|
|
+ private void addFieldsData(FieldData value) {
|
|
|
if (insertBuilder != null) {
|
|
|
insertBuilder.addFieldsData(value);
|
|
|
} else if (upsertBuilder != null) {
|
|
@@ -151,93 +146,260 @@ public class DataUtils {
|
|
|
outputFieldNames.addAll(function.getOutputFieldNames());
|
|
|
}
|
|
|
|
|
|
- List<CreateCollectionReq.FieldSchema> fieldsList = collectionSchema.getFieldSchemaList();
|
|
|
- Map<String, InsertDataInfo> nameInsertInfo = new HashMap<>();
|
|
|
- InsertDataInfo insertDynamicDataInfo = InsertDataInfo.builder().field(
|
|
|
- CreateCollectionReq.FieldSchema.builder()
|
|
|
- .name(Constant.DYNAMIC_FIELD_NAME)
|
|
|
- .dataType(io.milvus.v2.common.DataType.JSON)
|
|
|
- .build())
|
|
|
- .data(new LinkedList<>()).build();
|
|
|
- for (JsonObject row : rows) {
|
|
|
- for (CreateCollectionReq.FieldSchema field : fieldsList) {
|
|
|
- String fieldName = field.getName();
|
|
|
- InsertDataInfo insertDataInfo = nameInsertInfo.getOrDefault(fieldName, InsertDataInfo.builder()
|
|
|
- .field(field).data(new LinkedList<>()).build());
|
|
|
-
|
|
|
- // check normalField
|
|
|
- JsonElement rowFieldData = row.get(fieldName);
|
|
|
- if (rowFieldData == null) {
|
|
|
- // if the field is auto-id, no need to provide value
|
|
|
- if (field.getAutoID() == Boolean.TRUE) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // if the field is an output field of doc-in-doc-out, no need to provide value
|
|
|
- if (outputFieldNames.contains(field.getName())) {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ List<CreateCollectionReq.FieldSchema> normalFields = collectionSchema.getFieldSchemaList();
|
|
|
+ List<CreateCollectionReq.StructFieldSchema> structFields = collectionSchema.getStructFields();
|
|
|
+ List<String> allFieldNames = new ArrayList<>();
|
|
|
+ normalFields.forEach((schema)-> allFieldNames.add(schema.getName()));
|
|
|
+ structFields.forEach((schema)-> allFieldNames.add(schema.getName()));
|
|
|
|
|
|
- // if the field doesn't have default value, require user provide the value
|
|
|
- // in v2.6.1 support partial update, user can input partial fields
|
|
|
- if (!field.getIsNullable() && field.getDefaultValue() == null && !partialUpdate) {
|
|
|
- String msg = String.format("The field: %s is not provided.", field.getName());
|
|
|
- throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
- }
|
|
|
-
|
|
|
- rowFieldData = JsonNull.INSTANCE;
|
|
|
- }
|
|
|
-
|
|
|
- // from v2.4.10, milvus allows upsert for auto-id pk, no need to check for upsert action
|
|
|
- if (field.getAutoID() == Boolean.TRUE && insertBuilder != null) {
|
|
|
- String msg = String.format("The primary key: %s is auto generated, no need to input.", fieldName);
|
|
|
- throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
- }
|
|
|
+ // 1. for normal fields, InsertDataInfo is a list of object or list of list, for example:
|
|
|
+ // Int64Field, InsertDataInfo is a List<Long>
|
|
|
+ // FloatVectorField, InsertDataInfo is a List<List<Float>>
|
|
|
+ // the normalInsertData typically looks like:
|
|
|
+ // {
|
|
|
+ // "id": List<Long>,
|
|
|
+ // "vector": List<List<Float>>
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // 2. for struct fields, InsertDataInfo is list of list or 3-layer list, for example:
|
|
|
+ // A struct field named "struct1" has a sub-field "sub1" type is Varchar and a sub-field "sub2" type is FloatVector
|
|
|
+ // for the sub-field "sub1", InsertDataInfo is a List<List<String>>
|
|
|
+ // for the sub-field "sub2", InsertDataInfo is a List<List<List<Float>>>
|
|
|
+ // the structInsertData stores all sub-fields of all struct fields, typically looks like:
|
|
|
+ // {
|
|
|
+ // "sub1 of struct1": List<List<String>>,
|
|
|
+ // "sub2 of struct1": List<List<List<Float>>>,
|
|
|
+ // "sub3 of struct2": List<List<Integer>>,
|
|
|
+ // "sub4 of struct2": List<List<List<Float>>>
|
|
|
+ // }
|
|
|
+ Map<String, InsertDataInfo> normalInsertData = new HashMap<>();
|
|
|
+ Map<String, InsertDataInfo> structInsertData = new HashMap<>();
|
|
|
+ InsertDataInfo insertDynamicDataInfo = new InsertDataInfo(
|
|
|
+ CreateCollectionReq.FieldSchema.builder()
|
|
|
+ .name(Constant.DYNAMIC_FIELD_NAME)
|
|
|
+ .dataType(io.milvus.v2.common.DataType.JSON)
|
|
|
+ .build(),
|
|
|
+ new LinkedList<>());
|
|
|
+ for (JsonObject row : rows) {
|
|
|
+ // check and store value of normal fields into InsertDataInfo
|
|
|
+ for (CreateCollectionReq.FieldSchema field : normalFields) {
|
|
|
+ processNormalFieldValues(row, field, outputFieldNames, normalInsertData, partialUpdate);
|
|
|
+ }
|
|
|
|
|
|
- // here we convert the v2 FieldSchema to grpc.FieldSchema then to v1 FieldType
|
|
|
- // the reason is the logic in ParamUtils.checkFieldValue is complicated, we don't intend to
|
|
|
- // write duplicate code here
|
|
|
- FieldSchema grpcField = SchemaUtils.convertToGrpcFieldSchema(field);
|
|
|
- Object fieldValue = ParamUtils.checkFieldValue(ParamUtils.ConvertField(grpcField), rowFieldData);
|
|
|
- insertDataInfo.getData().add(fieldValue);
|
|
|
- nameInsertInfo.put(fieldName, insertDataInfo);
|
|
|
+ // check and store value of struct fields into InsertDataInfo
|
|
|
+ for (CreateCollectionReq.StructFieldSchema structField : structFields) {
|
|
|
+ processStructFieldValues(row, structField, structInsertData);
|
|
|
}
|
|
|
|
|
|
- // deal with dynamicField
|
|
|
+ // store dynamic fields into InsertDataInfo
|
|
|
if (collectionSchema.isEnableDynamicField()) {
|
|
|
JsonObject dynamicField = new JsonObject();
|
|
|
for (String rowFieldName : row.keySet()) {
|
|
|
- if (!nameInsertInfo.containsKey(rowFieldName)) {
|
|
|
+ if (!allFieldNames.contains(rowFieldName)) {
|
|
|
dynamicField.add(rowFieldName, row.get(rowFieldName));
|
|
|
}
|
|
|
}
|
|
|
- insertDynamicDataInfo.getData().add(dynamicField);
|
|
|
+ insertDynamicDataInfo.data.add(dynamicField);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for (String fieldNameKey : nameInsertInfo.keySet()) {
|
|
|
- InsertDataInfo insertDataInfo = nameInsertInfo.get(fieldNameKey);
|
|
|
- // here we convert the v2 FieldSchema to grpc.FieldSchema then to v1 FieldType
|
|
|
- // the reason is the logic in ParamUtils.genFieldData is complicated, we don't intend to
|
|
|
- // write duplicate code here
|
|
|
- FieldSchema grpcField = SchemaUtils.convertToGrpcFieldSchema(insertDataInfo.getField());
|
|
|
- this.addFieldsData(ParamUtils.genFieldData(ParamUtils.ConvertField(grpcField), insertDataInfo.getData()));
|
|
|
+ // convert normal fields data from InsertDataInfo into grpc FieldData
|
|
|
+ for (String fieldNameKey : normalInsertData.keySet()) {
|
|
|
+ InsertDataInfo insertDataInfo = normalInsertData.get(fieldNameKey);
|
|
|
+ this.addFieldsData(DataUtils.genFieldData(insertDataInfo.field, insertDataInfo.data, false));
|
|
|
}
|
|
|
+
|
|
|
+ // convert struct fields data from InsertDataInfo into grpc FieldData
|
|
|
+ for (CreateCollectionReq.StructFieldSchema structField : structFields) {
|
|
|
+ StructArrayField.Builder structBuilder = StructArrayField.newBuilder();
|
|
|
+ for (CreateCollectionReq.FieldSchema field : structField.getFields()) {
|
|
|
+ InsertDataInfo insertDataInfo = structInsertData.get(field.getName());
|
|
|
+ FieldData grpcField = DataUtils.genStructSubFieldData(field, insertDataInfo.data);
|
|
|
+ structBuilder.addFields(grpcField);
|
|
|
+ }
|
|
|
+
|
|
|
+ FieldData.Builder fieldDataBuilder = FieldData.newBuilder();
|
|
|
+ this.addFieldsData(fieldDataBuilder
|
|
|
+ .setFieldName(structField.getName())
|
|
|
+ .setType(DataType.ArrayOfStruct)
|
|
|
+ .setStructArrays(structBuilder.build())
|
|
|
+ .build());
|
|
|
+ }
|
|
|
+
|
|
|
+ // convert dynamic field data from InsertDataInfo into grpc FieldData
|
|
|
if (collectionSchema.isEnableDynamicField()) {
|
|
|
- // here we convert the v2 FieldSchema to grpc.FieldSchema then to v1 FieldType
|
|
|
- // the reason is the logic in ParamUtils.genFieldData is complicated, we don't intend to
|
|
|
- // write duplicate code here
|
|
|
- FieldSchema grpcField = SchemaUtils.convertToGrpcFieldSchema(insertDynamicDataInfo.getField());
|
|
|
- this.addFieldsData(ParamUtils.genFieldData(ParamUtils.ConvertField(grpcField), insertDynamicDataInfo.getData(), Boolean.TRUE));
|
|
|
+ this.addFieldsData(DataUtils.genFieldData(insertDynamicDataInfo.field, insertDynamicDataInfo.data, true));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processNormalFieldValues(JsonObject row, CreateCollectionReq.FieldSchema field,
|
|
|
+ List<String> outputFieldNames,
|
|
|
+ Map<String, InsertDataInfo> nameInsertInfo, boolean partialUpdate) {
|
|
|
+ String fieldName = field.getName();
|
|
|
+ InsertDataInfo insertDataInfo = nameInsertInfo.getOrDefault(fieldName, new InsertDataInfo(field, new LinkedList<>()));
|
|
|
+
|
|
|
+ JsonElement fieldData = row.get(fieldName);
|
|
|
+ if (fieldData == null) {
|
|
|
+ // if the field is auto-id, no need to provide value
|
|
|
+ if (field.getAutoID() == Boolean.TRUE) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // if the field is an output field of doc-in-doc-out, no need to provide value
|
|
|
+ if (outputFieldNames.contains(fieldName)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // if the field doesn't have default value, require user provide the value
|
|
|
+ // in v2.6.1 support partial update, user can input partial fields
|
|
|
+ if (!field.getIsNullable() && field.getDefaultValue() == null && !partialUpdate) {
|
|
|
+ String msg = String.format("The field: %s is not provided.", fieldName);
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ fieldData = JsonNull.INSTANCE;
|
|
|
+ }
|
|
|
+
|
|
|
+ // from v2.4.10, milvus allows upsert for auto-id pk, no need to check for upsert action
|
|
|
+ if (field.getAutoID() == Boolean.TRUE && insertBuilder != null) {
|
|
|
+ String msg = String.format("The primary key: %s is auto generated, no need to input.", fieldName);
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // store the value into InsertDataInfo
|
|
|
+ Object fieldValue = DataUtils.checkFieldValue(field, fieldData);
|
|
|
+ insertDataInfo.data.add(fieldValue);
|
|
|
+ nameInsertInfo.put(fieldName, insertDataInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processStructFieldValues(JsonObject row, CreateCollectionReq.StructFieldSchema structField,
|
|
|
+ Map<String, InsertDataInfo> nameInsertInfo) {
|
|
|
+ String structName = structField.getName();
|
|
|
+ JsonElement rowFieldData = row.get(structName);
|
|
|
+ if (rowFieldData == null) {
|
|
|
+ String msg = String.format("The struct field: %s is not provided.", structName);
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+ if (!rowFieldData.isJsonArray()) {
|
|
|
+ String msg = String.format("The value of struct field: %s is not a JSON array.", structName);
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (CreateCollectionReq.FieldSchema field : structField.getFields()) {
|
|
|
+ InsertDataInfo insertDataInfo = nameInsertInfo.getOrDefault(field.getName(), new InsertDataInfo(field, new LinkedList<>()));
|
|
|
+ nameInsertInfo.put(field.getName(), insertDataInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ JsonArray structs = rowFieldData.getAsJsonArray();
|
|
|
+ for (CreateCollectionReq.FieldSchema field : structField.getFields()) {
|
|
|
+ String subFieldName = field.getName();
|
|
|
+ InsertDataInfo insertDataInfo = nameInsertInfo.get(subFieldName);
|
|
|
+ List<Object> columnData = new ArrayList<>();
|
|
|
+ structs.forEach((element)->{
|
|
|
+ if (!element.isJsonObject()) {
|
|
|
+ String msg = String.format("The element of struct field: %s is not a JSON dict.", structName);
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ JsonObject struct = element.getAsJsonObject();
|
|
|
+ JsonElement fieldData = struct.get(subFieldName);
|
|
|
+ if (fieldData == null) {
|
|
|
+ String msg = String.format("The %s of struct field: %s is not provided.", subFieldName, structName);
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ Object fieldValue = DataUtils.checkFieldValue(field, fieldData);
|
|
|
+ columnData.add(fieldValue);
|
|
|
+ });
|
|
|
+ insertDataInfo.data.add(columnData);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Builder
|
|
|
- @Getter
|
|
|
public static class InsertDataInfo {
|
|
|
- private final CreateCollectionReq.FieldSchema field;
|
|
|
- private final LinkedList<Object> data;
|
|
|
+ public CreateCollectionReq.FieldSchema field;
|
|
|
+ public LinkedList<Object> data;
|
|
|
+
|
|
|
+ public InsertDataInfo(CreateCollectionReq.FieldSchema field, LinkedList<Object> data) {
|
|
|
+ this.field = field;
|
|
|
+ this.data = data;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static FieldData genStructSubFieldData(CreateCollectionReq.FieldSchema fieldSchema, List<?> objects) {
|
|
|
+ if (objects == null) {
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Cannot generate FieldData from null object");
|
|
|
+ }
|
|
|
+ DataType dataType = ConvertUtils.toProtoDataType(fieldSchema.getDataType());
|
|
|
+ String fieldName = fieldSchema.getName();
|
|
|
+ FieldData.Builder builder = FieldData.newBuilder().setFieldName(fieldName);
|
|
|
+
|
|
|
+ if (ParamUtils.isVectorDataType(dataType)) {
|
|
|
+ VectorArray vectorArr = genVectorArray(dataType, objects);
|
|
|
+ if (vectorArr.getDim() > 0 && vectorArr.getDim() != fieldSchema.getDimension()) {
|
|
|
+ String msg = String.format("Dimension mismatch for field %s, expected: %d, actual: %d",
|
|
|
+ fieldName, fieldSchema.getDimension(), vectorArr.getDim());
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+ return builder.setType(DataType.ArrayOfVector)
|
|
|
+ .setVectors(VectorField.newBuilder()
|
|
|
+ .setVectorArray(vectorArr)
|
|
|
+ .setDim(fieldSchema.getDimension())
|
|
|
+ .build())
|
|
|
+ .build();
|
|
|
+ } else {
|
|
|
+ if (fieldSchema.getIsNullable() || fieldSchema.getDefaultValue() != null) {
|
|
|
+ List<Object> tempObjects = new ArrayList<>();
|
|
|
+ for (Object obj : objects) {
|
|
|
+ builder.addValidData(obj != null);
|
|
|
+ if (obj != null) {
|
|
|
+ tempObjects.add(obj);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ objects = tempObjects;
|
|
|
+ }
|
|
|
+
|
|
|
+ ScalarField scalarField = ParamUtils.genScalarField(DataType.Array, dataType, objects);
|
|
|
+ return builder.setType(DataType.Array).setScalars(scalarField).build();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public static VectorArray genVectorArray(DataType dataType, List<?> objects) {
|
|
|
+ VectorArray.Builder builder = VectorArray.newBuilder().setElementType(dataType);
|
|
|
+ if (dataType == DataType.FloatVector) {
|
|
|
+ // each object is List<List<Float>>
|
|
|
+ for (Object object : objects) {
|
|
|
+ if (object instanceof List) {
|
|
|
+ List<?> listOfList = (List<?>) object;
|
|
|
+ if (listOfList.isEmpty()) {
|
|
|
+ // struct field value is empty, fill the VectorArray with zero-dim vectors?
|
|
|
+ builder.addData(VectorField.newBuilder().build());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ VectorField vf = ParamUtils.genVectorField(dataType, listOfList);
|
|
|
+ if (vf.getDim() == 0) {
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Vector cannot be empty list");
|
|
|
+ }
|
|
|
+ if (builder.getDataCount() == 0) {
|
|
|
+ builder.setDim(vf.getDim());
|
|
|
+ } else if (builder.getDim() != vf.getDim()) {
|
|
|
+ String msg = String.format("Dimension mismatch for vector field, the first dimension: %d, mismatched: %d",
|
|
|
+ builder.getDim(), vf.getDim());
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
+ }
|
|
|
+ builder.addData(vf);
|
|
|
+ } else {
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "The type of FloatVector must be List<>");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return builder.build();
|
|
|
+ }
|
|
|
+ // so far, struct field only supports FloatVector
|
|
|
+ String msg = String.format("Illegal vector dataType %s for struct field", dataType.name());
|
|
|
+ throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
|
|
|
}
|
|
|
|
|
|
public DeleteRequest ConvertToGrpcDeleteRequest(DeleteReq request) {
|
|
@@ -257,4 +419,23 @@ public class DataUtils {
|
|
|
}
|
|
|
return builder.build();
|
|
|
}
|
|
|
+
|
|
|
+ private static FieldData genFieldData(CreateCollectionReq.FieldSchema field, List<?> objects, boolean isDynamic) {
|
|
|
+ String fieldName = field.getName();
|
|
|
+ DataType dataType = ConvertUtils.toProtoDataType(field.getDataType());
|
|
|
+ DataType elementType = ConvertUtils.toProtoDataType(field.getElementType());
|
|
|
+ boolean isNullable = field.getIsNullable();
|
|
|
+ Object defaultVal = field.getDefaultValue();
|
|
|
+ return ParamUtils.genFieldData(fieldName, dataType, elementType, isNullable, defaultVal, objects, isDynamic);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Object checkFieldValue(CreateCollectionReq.FieldSchema field, JsonElement fieldData) {
|
|
|
+ DataType dataType = ConvertUtils.toProtoDataType(field.getDataType());
|
|
|
+ DataType elementType = ConvertUtils.toProtoDataType(field.getElementType());
|
|
|
+ int dim = field.getDimension() == null ? 0 : field.getDimension();
|
|
|
+ int maxLength = field.getMaxLength() == null ? 0 : field.getMaxLength();
|
|
|
+ int maxCapacity = field.getMaxCapacity() == null ? 0 : field.getMaxCapacity();
|
|
|
+ return ParamUtils.checkFieldValue(field.getName(), dataType, elementType, dim,
|
|
|
+ maxLength, maxCapacity, field.getIsNullable(), field.getDefaultValue(), fieldData);
|
|
|
+ }
|
|
|
}
|