|
@@ -19,11 +19,19 @@
|
|
|
|
|
|
package io.milvus.client;
|
|
|
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.google.gson.*;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.google.common.util.concurrent.ListenableFuture;
|
|
|
+import io.milvus.bulkwriter.LocalBulkWriter;
|
|
|
+import io.milvus.bulkwriter.LocalBulkWriterParam;
|
|
|
+import io.milvus.bulkwriter.common.clientenum.BulkFileType;
|
|
|
+import io.milvus.bulkwriter.common.utils.GeneratorUtils;
|
|
|
+import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
|
|
|
import io.milvus.common.clientenum.ConsistencyLevelEnum;
|
|
|
+import io.milvus.exception.ParamException;
|
|
|
import io.milvus.grpc.*;
|
|
|
+import io.milvus.orm.iterator.QueryIterator;
|
|
|
+import io.milvus.orm.iterator.SearchIterator;
|
|
|
import io.milvus.param.*;
|
|
|
import io.milvus.param.alias.AlterAliasParam;
|
|
|
import io.milvus.param.alias.CreateAliasParam;
|
|
@@ -31,7 +39,6 @@ import io.milvus.param.alias.DropAliasParam;
|
|
|
import io.milvus.param.alias.ListAliasesParam;
|
|
|
import io.milvus.param.collection.*;
|
|
|
import io.milvus.param.dml.*;
|
|
|
-import io.milvus.param.dml.ranker.RRFRanker;
|
|
|
import io.milvus.param.dml.ranker.WeightedRanker;
|
|
|
import io.milvus.param.highlevel.dml.DeleteIdsParam;
|
|
|
import io.milvus.param.highlevel.dml.GetIdsParam;
|
|
@@ -44,6 +51,7 @@ import io.milvus.param.index.GetIndexStateParam;
|
|
|
import io.milvus.param.partition.GetPartitionStatisticsParam;
|
|
|
import io.milvus.param.partition.ShowPartitionsParam;
|
|
|
import io.milvus.response.*;
|
|
|
+import org.apache.avro.generic.GenericData;
|
|
|
import org.apache.commons.text.RandomStringGenerator;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
@@ -64,9 +72,11 @@ import java.util.concurrent.TimeUnit;
|
|
|
@Testcontainers(disabledWithoutDocker = true)
|
|
|
class MilvusClientDockerTest {
|
|
|
private static final Logger logger = LogManager.getLogger("MilvusClientTest");
|
|
|
- private static MilvusClient client;
|
|
|
- private static RandomStringGenerator generator;
|
|
|
- private static final int dimension = 128;
|
|
|
+ protected static MilvusClient client;
|
|
|
+ protected static RandomStringGenerator generator;
|
|
|
+ protected static final int dimension = 128;
|
|
|
+
|
|
|
+ protected static final Gson GSON_INSTANCE = new Gson();
|
|
|
|
|
|
@Container
|
|
|
private static final MilvusContainer milvus = new MilvusContainer("milvusdb/milvus:v2.4.0-20240416-ffb6edd4-amd64");
|
|
@@ -98,7 +108,7 @@ class MilvusClientDockerTest {
|
|
|
return ConnectParam.newBuilder().withUri(milvusUri);
|
|
|
}
|
|
|
|
|
|
- private List<List<Float>> generateFloatVectors(int count) {
|
|
|
+ protected List<List<Float>> generateFloatVectors(int count) {
|
|
|
Random ran = new Random();
|
|
|
List<List<Float>> vectors = new ArrayList<>();
|
|
|
for (int n = 0; n < count; ++n) {
|
|
@@ -112,7 +122,7 @@ class MilvusClientDockerTest {
|
|
|
return vectors;
|
|
|
}
|
|
|
|
|
|
- private List<List<Float>> normalizeFloatVectors(List<List<Float>> src) {
|
|
|
+ protected List<List<Float>> normalizeFloatVectors(List<List<Float>> src) {
|
|
|
for (List<Float> vector : src) {
|
|
|
double total = 0.0;
|
|
|
for (Float val : vector) {
|
|
@@ -127,7 +137,7 @@ class MilvusClientDockerTest {
|
|
|
return src;
|
|
|
}
|
|
|
|
|
|
- private List<ByteBuffer> generateBinaryVectors(int count) {
|
|
|
+ protected List<ByteBuffer> generateBinaryVectors(int count) {
|
|
|
Random ran = new Random();
|
|
|
List<ByteBuffer> vectors = new ArrayList<>();
|
|
|
int byteCount = dimension / 8;
|
|
@@ -142,7 +152,7 @@ class MilvusClientDockerTest {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private List<SortedMap<Long, Float>> generateSparseVectors(int count) {
|
|
|
+ protected List<SortedMap<Long, Float>> generateSparseVectors(int count) {
|
|
|
Random ran = new Random();
|
|
|
List<SortedMap<Long, Float>> vectors = new ArrayList<>();
|
|
|
for (int n = 0; n < count; ++n) {
|
|
@@ -516,26 +526,43 @@ class MilvusClientDockerTest {
|
|
|
R<RpcStatus> createR = client.createCollection(createParam);
|
|
|
Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
|
|
|
|
|
|
- // insert data
|
|
|
int rowCount = 10000;
|
|
|
List<ByteBuffer> vectors = generateBinaryVectors(rowCount);
|
|
|
-
|
|
|
+ // insert data by columns
|
|
|
List<InsertParam.Field> fields = new ArrayList<>();
|
|
|
- // no need to provide id here since this field is auto_id
|
|
|
- fields.add(new InsertParam.Field(field2Name, vectors));
|
|
|
+ fields.add(new InsertParam.Field(field2Name, vectors.subList(0, 5000))); // no need to provide id here since this field is auto_id
|
|
|
|
|
|
- InsertParam insertParam = InsertParam.newBuilder()
|
|
|
+ R<MutationResult> insertR1 = client.insert(InsertParam.newBuilder()
|
|
|
.withCollectionName(randomCollectionName)
|
|
|
.withFields(fields)
|
|
|
- .build();
|
|
|
+ .build());
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), insertR1.getStatus().intValue());
|
|
|
|
|
|
- R<MutationResult> insertR = client.insert(insertParam);
|
|
|
- Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
|
|
|
-// System.out.println(insertR.getData());
|
|
|
- MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
|
|
|
+ MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR1.getData());
|
|
|
System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
|
|
|
- List<Long> ids = insertResultWrapper.getLongIDs();
|
|
|
-// System.out.println("Auto-generated ids: " + ids);
|
|
|
+ List<Long> ids1 = insertResultWrapper.getLongIDs(); // get returned IDs(generated by server-side)
|
|
|
+
|
|
|
+ // Insert entities by rows
|
|
|
+ List<JsonObject> rows = new ArrayList<>();
|
|
|
+ for (int i = 5000; i < rowCount; ++i) {
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.add(field2Name, GSON_INSTANCE.toJsonTree(vectors.get(i).array()));
|
|
|
+ rows.add(row);
|
|
|
+ }
|
|
|
+
|
|
|
+ R<MutationResult> insertR2 = client.insert(InsertParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .withRows(rows)
|
|
|
+ .build());
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), insertR2.getStatus().intValue());
|
|
|
+
|
|
|
+ insertResultWrapper = new MutationResultWrapper(insertR2.getData());
|
|
|
+ System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
|
|
|
+ List<Long> ids2 = insertResultWrapper.getLongIDs(); // get returned IDs(generated by server-side)
|
|
|
+ List<Long> ids = new ArrayList<>();
|
|
|
+ ids1.forEach((id) -> { ids.add(id);});
|
|
|
+ ids2.forEach((id) -> { ids.add(id);});
|
|
|
+ Assertions.assertEquals(rowCount, ids.size());
|
|
|
|
|
|
// get collection statistics
|
|
|
R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam
|
|
@@ -547,6 +574,7 @@ class MilvusClientDockerTest {
|
|
|
|
|
|
GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
|
|
|
System.out.println("Collection row count: " + stat.getRowCount());
|
|
|
+ Assertions.assertEquals(rowCount, stat.getRowCount());
|
|
|
|
|
|
// create index
|
|
|
CreateIndexParam indexParam2 = CreateIndexParam.newBuilder()
|
|
@@ -933,6 +961,7 @@ class MilvusClientDockerTest {
|
|
|
.withRanker(WeightedRanker.newBuilder()
|
|
|
.withWeights(Lists.newArrayList(0.5f, 0.5f, 1.0f))
|
|
|
.build())
|
|
|
+ .withOutFields(Collections.singletonList("*"))
|
|
|
.build();
|
|
|
|
|
|
R<SearchResults> searchR = client.hybridSearch(searchParam);
|
|
@@ -941,8 +970,20 @@ class MilvusClientDockerTest {
|
|
|
// print search result
|
|
|
SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
|
|
|
List<SearchResultsWrapper.IDScore> scores = results.getIDScore(0);
|
|
|
- for (int i = 0; i < scores.size(); ++i) {
|
|
|
- System.out.println(scores.get(i));
|
|
|
+ for (SearchResultsWrapper.IDScore score : scores) {
|
|
|
+ System.out.println(score);
|
|
|
+ Object id = score.get(idField);
|
|
|
+ Assertions.assertInstanceOf(Long.class, id);
|
|
|
+ Object fv = score.get(floatVectorField);
|
|
|
+ Assertions.assertInstanceOf(List.class, fv);
|
|
|
+ List<Float> fvec = (List<Float>)fv;
|
|
|
+ Assertions.assertEquals(dimension, fvec.size());
|
|
|
+ Object bv = score.get(binaryVectorField);
|
|
|
+ Assertions.assertInstanceOf(ByteBuffer.class, bv);
|
|
|
+ ByteBuffer bvec = (ByteBuffer)bv;
|
|
|
+ Assertions.assertEquals(dimension, bvec.limit()*8);
|
|
|
+ Object sv = score.get(sparseVectorField);
|
|
|
+ Assertions.assertInstanceOf(SortedMap.class, sv);
|
|
|
}
|
|
|
|
|
|
// drop collection
|
|
@@ -1583,20 +1624,21 @@ class MilvusClientDockerTest {
|
|
|
|
|
|
int rowCount = 10;
|
|
|
// insert data by row-based
|
|
|
- List<JSONObject> rows = new ArrayList<>();
|
|
|
+ List<JsonObject> rows = new ArrayList<>();
|
|
|
for (long i = 0L; i < rowCount; ++i) {
|
|
|
- JSONObject row = new JSONObject();
|
|
|
- row.put(field1Name, i);
|
|
|
- row.put(field2Name, generateFloatVectors(1).get(0));
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.addProperty(field1Name, i);
|
|
|
+ List<Float> vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(field2Name, GSON_INSTANCE.toJsonTree(vector));
|
|
|
|
|
|
// JSON field
|
|
|
- JSONObject info = new JSONObject();
|
|
|
- info.put("row_based_info", i);
|
|
|
- row.put(field3Name, info);
|
|
|
+ JsonObject info = new JsonObject();
|
|
|
+ info.addProperty("row_based_info", i);
|
|
|
+ row.add(field3Name, info);
|
|
|
|
|
|
// extra meta is automatically stored in dynamic field
|
|
|
- row.put("row_based_extra", i % 3 == 0);
|
|
|
- row.put(generator.generate(5), 100);
|
|
|
+ row.addProperty("row_based_extra", i % 3 == 0);
|
|
|
+ row.addProperty(generator.generate(5), 100);
|
|
|
|
|
|
rows.add(row);
|
|
|
}
|
|
@@ -1612,17 +1654,17 @@ class MilvusClientDockerTest {
|
|
|
|
|
|
// insert data by column-based
|
|
|
List<Long> ids = new ArrayList<>();
|
|
|
- List<JSONObject> infos = new ArrayList<>();
|
|
|
- List<JSONObject> dynamics = new ArrayList<>();
|
|
|
+ List<JsonObject> infos = new ArrayList<>();
|
|
|
+ List<JsonObject> dynamics = new ArrayList<>();
|
|
|
for (long i = 0L; i < rowCount; ++i) {
|
|
|
ids.add(rowCount + i);
|
|
|
- JSONObject obj = new JSONObject();
|
|
|
- obj.put("column_based_info", i);
|
|
|
- obj.put(generator.generate(5), i);
|
|
|
+ JsonObject obj = new JsonObject();
|
|
|
+ obj.addProperty("column_based_info", i);
|
|
|
+ obj.addProperty(generator.generate(5), i);
|
|
|
infos.add(obj);
|
|
|
|
|
|
- JSONObject dynamic = new JSONObject();
|
|
|
- dynamic.put(String.format("column_based_extra_%d", i), i);
|
|
|
+ JsonObject dynamic = new JsonObject();
|
|
|
+ dynamic.addProperty(String.format("column_based_extra_%d", i), i);
|
|
|
dynamics.add(dynamic);
|
|
|
}
|
|
|
List<List<Float>> vectors = generateFloatVectors(rowCount);
|
|
@@ -1671,6 +1713,7 @@ class MilvusClientDockerTest {
|
|
|
for (QueryResultsWrapper.RowRecord record:records) {
|
|
|
System.out.println(record);
|
|
|
Object extraMeta = record.get("row_based_extra");
|
|
|
+ Assertions.assertInstanceOf(Boolean.class, extraMeta);
|
|
|
System.out.println("'row_based_extra' is from dynamic field, value: " + extraMeta);
|
|
|
}
|
|
|
|
|
@@ -1726,6 +1769,14 @@ class MilvusClientDockerTest {
|
|
|
System.out.println(record);
|
|
|
long id = (long)record.get(field1Name);
|
|
|
Assertions.assertEquals((long)rowCount+1L, id);
|
|
|
+ Object vec = record.get(field2Name);
|
|
|
+ Assertions.assertInstanceOf(List.class, vec);
|
|
|
+ List<Float> vector = (List<Float>)vec;
|
|
|
+ Assertions.assertEquals(dimension, vector.size());
|
|
|
+ Object j = record.get(field3Name);
|
|
|
+ Assertions.assertInstanceOf(JsonObject.class, j);
|
|
|
+ JsonObject jon = (JsonObject)j;
|
|
|
+ Assertions.assertTrue(jon.has("column_based_info"));
|
|
|
}
|
|
|
|
|
|
// drop collection
|
|
@@ -1847,11 +1898,12 @@ class MilvusClientDockerTest {
|
|
|
System.out.println(rowCount + " rows inserted");
|
|
|
|
|
|
// insert data by row-based
|
|
|
- List<JSONObject> rows = new ArrayList<>();
|
|
|
+ List<JsonObject> rows = new ArrayList<>();
|
|
|
for (int i = 0; i < rowCount; ++i) {
|
|
|
- JSONObject row = new JSONObject();
|
|
|
- row.put(field1Name, 10000L + (long)i);
|
|
|
- row.put(field2Name, generateFloatVectors(1).get(0));
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.addProperty(field1Name, 10000L + (long)i);
|
|
|
+ List<Float> vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(field2Name, GSON_INSTANCE.toJsonTree(vector));
|
|
|
|
|
|
List<String> strArray = new ArrayList<>();
|
|
|
List<Integer> intArray = new ArrayList<>();
|
|
@@ -1861,9 +1913,9 @@ class MilvusClientDockerTest {
|
|
|
intArray.add(i*10000 + k);
|
|
|
floatArray.add((float)k/1000 + i);
|
|
|
}
|
|
|
- row.put(field3Name, strArray);
|
|
|
- row.put(field4Name, intArray);
|
|
|
- row.put(field5Name, floatArray);
|
|
|
+ row.add(field3Name, GSON_INSTANCE.toJsonTree(strArray));
|
|
|
+ row.add(field4Name, GSON_INSTANCE.toJsonTree(intArray));
|
|
|
+ row.add(field5Name, GSON_INSTANCE.toJsonTree(floatArray));
|
|
|
|
|
|
rows.add(row);
|
|
|
}
|
|
@@ -1976,6 +2028,7 @@ class MilvusClientDockerTest {
|
|
|
CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
|
|
|
.withCollectionName(randomCollectionName)
|
|
|
.withFieldTypes(fieldsSchema)
|
|
|
+ .withEnableDynamicField(true)
|
|
|
.build();
|
|
|
|
|
|
R<RpcStatus> createR = client.createCollection(createParam);
|
|
@@ -1983,12 +2036,14 @@ class MilvusClientDockerTest {
|
|
|
|
|
|
// insert data by row-based with id from 0 ~ 9
|
|
|
int rowCount = 10;
|
|
|
- List<JSONObject> rows = new ArrayList<>();
|
|
|
+ List<JsonObject> rows = new ArrayList<>();
|
|
|
for (long i = 0L; i < rowCount; ++i) {
|
|
|
- JSONObject row = new JSONObject();
|
|
|
- row.put(field1Name, i);
|
|
|
- row.put(field2Name, generateFloatVectors(1).get(0));
|
|
|
- row.put(field3Name, String.format("name_%d", i));
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.addProperty(field1Name, i);
|
|
|
+ List<Float> vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(field2Name, GSON_INSTANCE.toJsonTree(vector));
|
|
|
+ row.addProperty(field3Name, String.format("name_%d", i));
|
|
|
+ row.addProperty("dynamic_value", String.format("dynamic_%d", i));
|
|
|
rows.add(row);
|
|
|
}
|
|
|
|
|
@@ -2057,10 +2112,11 @@ class MilvusClientDockerTest {
|
|
|
// since the ids are not exist, the upsert call is equal to an insert call
|
|
|
rows.clear();
|
|
|
for (long i = 0L; i < rowCount; ++i) {
|
|
|
- JSONObject row = new JSONObject();
|
|
|
- row.put(field1Name, rowCount + i);
|
|
|
- row.put(field2Name, generateFloatVectors(1).get(0));
|
|
|
- row.put(field3Name, String.format("name_%d", rowCount + i));
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.addProperty(field1Name, rowCount + i);
|
|
|
+ List<Float> vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(field2Name, GSON_INSTANCE.toJsonTree(vector));
|
|
|
+ row.addProperty(field3Name, String.format("name_%d", rowCount + i));
|
|
|
rows.add(row);
|
|
|
}
|
|
|
|
|
@@ -2078,6 +2134,7 @@ class MilvusClientDockerTest {
|
|
|
.withCollectionName(randomCollectionName)
|
|
|
.withExpr(String.format("%s == 18", field1Name))
|
|
|
.addOutField(field3Name)
|
|
|
+ .addOutField("dynamic_value")
|
|
|
.withConsistencyLevel(ConsistencyLevelEnum.STRONG)
|
|
|
.build();
|
|
|
|
|
@@ -2092,19 +2149,24 @@ class MilvusClientDockerTest {
|
|
|
Object name = record.get(field3Name);
|
|
|
Assertions.assertNotNull(name);
|
|
|
Assertions.assertEquals("name_18", name);
|
|
|
+ Assertions.assertThrows(ParamException.class, () -> record.get("dynamic_value")); // we didn't set dynamic_value for No.18 row
|
|
|
}
|
|
|
|
|
|
// upsert to change the no.5 and no.18 items
|
|
|
rows.clear();
|
|
|
- JSONObject row = new JSONObject();
|
|
|
- row.put(field1Name, 5L);
|
|
|
- row.put(field2Name, generateFloatVectors(1).get(0));
|
|
|
- row.put(field3Name, "updated_5");
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.addProperty(field1Name, 5L);
|
|
|
+ List<Float> vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(field2Name, GSON_INSTANCE.toJsonTree(vector));
|
|
|
+ row.addProperty(field3Name, "updated_5");
|
|
|
+ row.addProperty("dynamic_value", String.format("dynamic_%d", 5));
|
|
|
rows.add(row);
|
|
|
- row = new JSONObject();
|
|
|
- row.put(field1Name, 18L);
|
|
|
- row.put(field2Name, generateFloatVectors(1).get(0));
|
|
|
- row.put(field3Name, "updated_18");
|
|
|
+ row = new JsonObject();
|
|
|
+ row.addProperty(field1Name, 18L);
|
|
|
+ vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(field2Name, GSON_INSTANCE.toJsonTree(vector));
|
|
|
+ row.addProperty(field3Name, "updated_18");
|
|
|
+ row.addProperty("dynamic_value", 18);
|
|
|
rows.add(row);
|
|
|
|
|
|
upsertParam = UpsertParam.newBuilder()
|
|
@@ -2120,6 +2182,7 @@ class MilvusClientDockerTest {
|
|
|
.withCollectionName(randomCollectionName)
|
|
|
.withExpr(String.format("%s == 5 || %s == 18", field1Name, field1Name))
|
|
|
.addOutField(field3Name)
|
|
|
+ .addOutField("dynamic_value")
|
|
|
.withConsistencyLevel(ConsistencyLevelEnum.STRONG)
|
|
|
.build();
|
|
|
|
|
@@ -2130,7 +2193,9 @@ class MilvusClientDockerTest {
|
|
|
records = queryResultsWrapper.getRowRecords();
|
|
|
Assertions.assertEquals(2, records.size());
|
|
|
Assertions.assertEquals("updated_5", records.get(0).get(field3Name));
|
|
|
+ Assertions.assertEquals("dynamic_5", records.get(0).get("dynamic_value"));
|
|
|
Assertions.assertEquals("updated_18", records.get(1).get(field3Name));
|
|
|
+ Assertions.assertEquals(18L, records.get(1).get("dynamic_value"));
|
|
|
|
|
|
// drop collection
|
|
|
R<RpcStatus> dropR = client.dropCollection(DropCollectionParam.newBuilder()
|
|
@@ -2288,11 +2353,16 @@ class MilvusClientDockerTest {
|
|
|
// insert data
|
|
|
List<String> primaryIds = new ArrayList<>();
|
|
|
int rowCount = 10;
|
|
|
- List<JSONObject> rows = new ArrayList<>();
|
|
|
+ List<JsonObject> rows = new ArrayList<>();
|
|
|
for (long i = 0L; i < rowCount; ++i) {
|
|
|
- JSONObject row = new JSONObject();
|
|
|
- row.put(primaryField.getName(), primaryField.getDataType() == DataType.Int64 ? i : String.valueOf(i));
|
|
|
- row.put(vectorField.getName(), generateFloatVectors(1).get(0));
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ if (primaryField.getDataType() == DataType.Int64) {
|
|
|
+ row.addProperty(primaryField.getName(), i);
|
|
|
+ } else {
|
|
|
+ row.addProperty(primaryField.getName(), String.valueOf(i));
|
|
|
+ }
|
|
|
+ List<Float> vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(vectorField.getName(), GSON_INSTANCE.toJsonTree(vector));
|
|
|
rows.add(row);
|
|
|
primaryIds.add(String.valueOf(i));
|
|
|
}
|
|
@@ -2348,11 +2418,16 @@ class MilvusClientDockerTest {
|
|
|
// insert data
|
|
|
List<String> primaryIds = new ArrayList<>();
|
|
|
int rowCount = 10;
|
|
|
- List<JSONObject> rows = new ArrayList<>();
|
|
|
+ List<JsonObject> rows = new ArrayList<>();
|
|
|
for (long i = 0L; i < rowCount; ++i) {
|
|
|
- JSONObject row = new JSONObject();
|
|
|
- row.put(primaryField.getName(), primaryField.getDataType() == DataType.Int64 ? i : String.valueOf(i));
|
|
|
- row.put(vectorField.getName(), generateFloatVectors(1).get(0));
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ if (primaryField.getDataType() == DataType.Int64) {
|
|
|
+ row.addProperty(primaryField.getName(), i);
|
|
|
+ } else {
|
|
|
+ row.addProperty(primaryField.getName(), String.valueOf(i));
|
|
|
+ }
|
|
|
+ List<Float> vector = generateFloatVectors(1).get(0);
|
|
|
+ row.add(vectorField.getName(), GSON_INSTANCE.toJsonTree(vector));
|
|
|
rows.add(row);
|
|
|
primaryIds.add(String.valueOf(i));
|
|
|
}
|
|
@@ -2393,4 +2468,280 @@ class MilvusClientDockerTest {
|
|
|
System.out.println(outPutStr);
|
|
|
Assertions.assertEquals(R.Status.Success.getCode(), deleteResponseR.getStatus().intValue());
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testBulkWriter() {
|
|
|
+ String randomCollectionName = generator.generate(10);
|
|
|
+
|
|
|
+ // collection schema
|
|
|
+ FieldType field1 = FieldType.newBuilder()
|
|
|
+ .withPrimaryKey(true)
|
|
|
+ .withAutoID(false)
|
|
|
+ .withDataType(DataType.Int64)
|
|
|
+ .withName("id")
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FieldType field2 = FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.FloatVector)
|
|
|
+ .withName("vector")
|
|
|
+ .withDimension(dimension)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FieldType field3 = FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.Bool)
|
|
|
+ .withName("bool")
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FieldType field4 = FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.Int16)
|
|
|
+ .withName("int16")
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FieldType field5 = FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.Float)
|
|
|
+ .withName("float")
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FieldType field6 = FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.VarChar)
|
|
|
+ .withName("varchar")
|
|
|
+ .withMaxLength(100)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FieldType field7 = FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.JSON)
|
|
|
+ .withName("json")
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FieldType field8 = FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.Array)
|
|
|
+ .withElementType(DataType.Int32)
|
|
|
+ .withName("array")
|
|
|
+ .withMaxCapacity(100)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
|
|
|
+ .withFieldTypes(Lists.newArrayList(field1, field2, field3, field4, field5, field6, field7, field8))
|
|
|
+ .withEnableDynamicField(false)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ // create collection
|
|
|
+ CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .withSchema(schema)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ R<RpcStatus> createR = client.createCollection(createParam);
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
|
|
|
+
|
|
|
+ // local bulkwriter
|
|
|
+ LocalBulkWriterParam localWriterParam = LocalBulkWriterParam.newBuilder()
|
|
|
+ .withCollectionSchema(schema)
|
|
|
+ .withLocalPath("/tmp/bulk_writer")
|
|
|
+ .withFileType(BulkFileType.PARQUET)
|
|
|
+ .withChunkSize(4 * 1024 * 1024)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ int rowCount = 10000;
|
|
|
+ List<List<String>> batchFiles = new ArrayList<>();
|
|
|
+ try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(localWriterParam)) {
|
|
|
+ for (int i = 0; i < rowCount; i++) {
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.addProperty("id", i);
|
|
|
+ row.add("vector", GSON_INSTANCE.toJsonTree(GeneratorUtils.genFloatVector(dimension)));
|
|
|
+ row.addProperty("bool", i % 3 == 0);
|
|
|
+ row.addProperty("int16", i%65535);
|
|
|
+ row.addProperty("float", i/3);
|
|
|
+ row.addProperty("varchar", String.format("varchar_%d", i));
|
|
|
+ JsonObject obj = new JsonObject();
|
|
|
+ obj.addProperty("dummy", i);
|
|
|
+ row.add("json", obj);
|
|
|
+ row.addProperty("dynamic_1", i);
|
|
|
+ row.addProperty("dynamic_2", String.format("dynamic_%d", i));
|
|
|
+ row.add("array", GSON_INSTANCE.toJsonTree(Lists.newArrayList(5, 6, 3, 2, 1)));
|
|
|
+
|
|
|
+ localBulkWriter.appendRow(row);
|
|
|
+ }
|
|
|
+
|
|
|
+ localBulkWriter.commit(false);
|
|
|
+ List<List<String>> files = localBulkWriter.getBatchFiles();
|
|
|
+ System.out.printf("LocalBulkWriter done! output local files: %s%n", files);
|
|
|
+ Assertions.assertEquals(files.size(), 2);
|
|
|
+ Assertions.assertEquals(files.get(0).size(), 1);
|
|
|
+ batchFiles.addAll(files);
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println("LocalBulkWriter catch exception: " + e);
|
|
|
+ Assertions.fail();
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ final int[] counter = {0};
|
|
|
+ for (List<String> files : batchFiles) {
|
|
|
+ new ParquetReaderUtils() {
|
|
|
+ @Override
|
|
|
+ public void readRecord(GenericData.Record record) {
|
|
|
+ counter[0]++;
|
|
|
+ }
|
|
|
+ }.readParquet(files.get(0));
|
|
|
+ }
|
|
|
+ Assertions.assertEquals(rowCount, counter[0]);
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println("Verify parquet file catch exception: " + e);
|
|
|
+ Assertions.fail();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testIterator() {
|
|
|
+ String randomCollectionName = generator.generate(10);
|
|
|
+
|
|
|
+ String field1Name = "str_id";
|
|
|
+ String field2Name = "vec_field";
|
|
|
+ String field3Name = "json_field";
|
|
|
+ List<FieldType> fieldsSchema = new ArrayList<>();
|
|
|
+ fieldsSchema.add(FieldType.newBuilder()
|
|
|
+ .withPrimaryKey(true)
|
|
|
+ .withAutoID(false)
|
|
|
+ .withDataType(DataType.VarChar)
|
|
|
+ .withName(field1Name)
|
|
|
+ .withMaxLength(32)
|
|
|
+ .build());
|
|
|
+
|
|
|
+ fieldsSchema.add(FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.FloatVector)
|
|
|
+ .withName(field2Name)
|
|
|
+ .withDimension(dimension)
|
|
|
+ .build());
|
|
|
+
|
|
|
+ fieldsSchema.add(FieldType.newBuilder()
|
|
|
+ .withDataType(DataType.JSON)
|
|
|
+ .withName(field3Name)
|
|
|
+ .build());
|
|
|
+
|
|
|
+
|
|
|
+ // create collection
|
|
|
+ CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .withFieldTypes(fieldsSchema)
|
|
|
+ .withEnableDynamicField(true)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ R<RpcStatus> createR = client.createCollection(createParam);
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
|
|
|
+
|
|
|
+ // insert data
|
|
|
+ int rowCount = 1000;
|
|
|
+ List<JsonObject> rows = new ArrayList<>();
|
|
|
+ Gson gson = new Gson();
|
|
|
+ for (long i = 0L; i < rowCount; ++i) {
|
|
|
+ JsonObject row = new JsonObject();
|
|
|
+ row.addProperty(field1Name, Long.toString(i));
|
|
|
+ row.add(field2Name, gson.toJsonTree(generateFloatVectors(1).get(0)));
|
|
|
+ JsonObject json = new JsonObject();
|
|
|
+ if (i%2 == 0) {
|
|
|
+ json.addProperty("even", true);
|
|
|
+ }
|
|
|
+ row.add(field3Name, json);
|
|
|
+ row.addProperty("dynamic", i);
|
|
|
+ rows.add(row);
|
|
|
+ }
|
|
|
+
|
|
|
+ InsertParam insertParam = InsertParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .withRows(rows)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ R<MutationResult> insertR = client.insert(insertParam);
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
|
|
|
+
|
|
|
+ // create index
|
|
|
+ CreateIndexParam indexParam = CreateIndexParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .withFieldName(field2Name)
|
|
|
+ .withIndexType(IndexType.FLAT)
|
|
|
+ .withMetricType(MetricType.L2)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ R<RpcStatus> createIndexR = client.createIndex(indexParam);
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
|
|
|
+
|
|
|
+ // load collection
|
|
|
+ R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .build());
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
|
|
|
+
|
|
|
+ // query iterator
|
|
|
+ QueryIteratorParam.Builder queryIteratorParamBuilder = QueryIteratorParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .withExpr("dynamic < 300")
|
|
|
+ .withOutFields(Lists.newArrayList("*"))
|
|
|
+ .withBatchSize(100L)
|
|
|
+ .withConsistencyLevel(ConsistencyLevelEnum.BOUNDED);
|
|
|
+
|
|
|
+ R<QueryIterator> qResponse = client.queryIterator(queryIteratorParamBuilder.build());
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), qResponse.getStatus().intValue());
|
|
|
+
|
|
|
+ QueryIterator queryIterator = qResponse.getData();
|
|
|
+ int counter = 0;
|
|
|
+ while (true) {
|
|
|
+ List<QueryResultsWrapper.RowRecord> res = queryIterator.next();
|
|
|
+ if (res.isEmpty()) {
|
|
|
+ System.out.println("query iteration finished, close");
|
|
|
+ queryIterator.close();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (QueryResultsWrapper.RowRecord record : res) {
|
|
|
+ Assertions.assertInstanceOf(Long.class, record.get("dynamic"));
|
|
|
+ Assertions.assertInstanceOf(String.class, record.get(field1Name));
|
|
|
+ Object vec = record.get(field2Name);
|
|
|
+ Assertions.assertInstanceOf(List.class, vec);
|
|
|
+ List<Float> vector = (List<Float>)vec;
|
|
|
+ Assertions.assertEquals(dimension, vector.size());
|
|
|
+ Assertions.assertInstanceOf(JsonElement.class, record.get(field3Name));
|
|
|
+// System.out.println(record);
|
|
|
+ counter++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assertions.assertEquals(300, counter);
|
|
|
+
|
|
|
+ // search iterator
|
|
|
+ List<List<Float>> vectors = generateFloatVectors(1);
|
|
|
+ SearchIteratorParam.Builder searchIteratorParamBuilder = SearchIteratorParam.newBuilder()
|
|
|
+ .withCollectionName(randomCollectionName)
|
|
|
+ .withOutFields(Lists.newArrayList("*"))
|
|
|
+ .withBatchSize(10L)
|
|
|
+ .withVectorFieldName(field2Name)
|
|
|
+ .withFloatVectors(vectors)
|
|
|
+ .withTopK(50)
|
|
|
+ .withMetricType(MetricType.L2);
|
|
|
+
|
|
|
+ R<SearchIterator> sResponse = client.searchIterator(searchIteratorParamBuilder.build());
|
|
|
+ Assertions.assertEquals(R.Status.Success.getCode(), sResponse.getStatus().intValue());
|
|
|
+
|
|
|
+ SearchIterator searchIterator = sResponse.getData();
|
|
|
+ counter = 0;
|
|
|
+ while (true) {
|
|
|
+ List<QueryResultsWrapper.RowRecord> res = searchIterator.next();
|
|
|
+ if (res.isEmpty()) {
|
|
|
+ System.out.println("search iteration finished, close");
|
|
|
+ searchIterator.close();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (QueryResultsWrapper.RowRecord record : res) {
|
|
|
+ Assertions.assertInstanceOf(Long.class, record.get("dynamic"));
|
|
|
+ Assertions.assertInstanceOf(String.class, record.get(field1Name));
|
|
|
+ Object vec = record.get(field2Name);
|
|
|
+ Assertions.assertInstanceOf(List.class, vec);
|
|
|
+ List<Float> vector = (List<Float>)vec;
|
|
|
+ Assertions.assertEquals(dimension, vector.size());
|
|
|
+ Assertions.assertInstanceOf(JsonElement.class, record.get(field3Name));
|
|
|
+// System.out.println(record);
|
|
|
+ counter++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assertions.assertEquals(50, counter);
|
|
|
+ }
|
|
|
}
|