|
|
@@ -19,16 +19,15 @@
|
|
|
|
|
|
package io.milvus.bulkwriter;
|
|
|
|
|
|
-import com.google.gson.JsonElement;
|
|
|
-import com.google.gson.JsonNull;
|
|
|
-import com.google.gson.JsonObject;
|
|
|
-import com.google.gson.JsonPrimitive;
|
|
|
+import com.google.gson.*;
|
|
|
+import com.google.gson.reflect.TypeToken;
|
|
|
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
|
|
|
import io.milvus.bulkwriter.common.utils.GeneratorUtils;
|
|
|
import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
|
|
|
import io.milvus.bulkwriter.common.utils.V2AdapterUtils;
|
|
|
import io.milvus.common.utils.JsonUtils;
|
|
|
import io.milvus.exception.MilvusException;
|
|
|
+import io.milvus.param.Constant;
|
|
|
import io.milvus.param.collection.CollectionSchemaParam;
|
|
|
import io.milvus.param.collection.FieldType;
|
|
|
import io.milvus.v2.common.DataType;
|
|
|
@@ -39,7 +38,13 @@ import org.apache.avro.util.Utf8;
|
|
|
import org.junit.jupiter.api.Assertions;
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.Reader;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Paths;
|
|
|
import java.util.*;
|
|
|
|
|
|
public class BulkWriterTest {
|
|
|
@@ -127,7 +132,7 @@ public class BulkWriterTest {
|
|
|
schemaV2.addField(AddFieldReq.builder()
|
|
|
.fieldName("int8_field")
|
|
|
.dataType(DataType.Int8)
|
|
|
- .defaultValue((short)8)
|
|
|
+ .defaultValue((short) 8)
|
|
|
.build());
|
|
|
schemaV2.addField(AddFieldReq.builder()
|
|
|
.fieldName("int16_field")
|
|
|
@@ -207,6 +212,50 @@ public class BulkWriterTest {
|
|
|
.dataType(DataType.Int8Vector)
|
|
|
.dimension(DIMENSION)
|
|
|
.build());
|
|
|
+ schemaV2.addField(AddFieldReq.builder()
|
|
|
+ .fieldName("struct_field")
|
|
|
+ .dataType(DataType.Array)
|
|
|
+ .elementType(DataType.Struct)
|
|
|
+ .maxCapacity(100)
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_bool")
|
|
|
+ .dataType(DataType.Bool)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_int8")
|
|
|
+ .dataType(DataType.Int8)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_int16")
|
|
|
+ .dataType(DataType.Int16)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_int32")
|
|
|
+ .dataType(DataType.Int32)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_int64")
|
|
|
+ .dataType(DataType.Int64)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_float")
|
|
|
+ .dataType(DataType.Float)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_double")
|
|
|
+ .dataType(DataType.Double)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_string")
|
|
|
+ .dataType(DataType.VarChar)
|
|
|
+ .maxLength(100)
|
|
|
+ .build())
|
|
|
+ .addStructField(AddFieldReq.builder()
|
|
|
+ .fieldName("st_float_vector")
|
|
|
+ .dataType(DataType.FloatVector)
|
|
|
+ .dimension(DIMENSION)
|
|
|
+ .build())
|
|
|
+ .build());
|
|
|
return schemaV2;
|
|
|
}
|
|
|
|
|
|
@@ -220,7 +269,7 @@ public class BulkWriterTest {
|
|
|
}
|
|
|
|
|
|
// some rows contains null values
|
|
|
- if (i%5 == 0) {
|
|
|
+ if (i % 5 == 0) {
|
|
|
// scalar field
|
|
|
// rowObject.addProperty("bool_field", true); // nullable, no need to provide
|
|
|
// rowObject.add("int8_field", null); // has default value, no need to provide
|
|
|
@@ -235,8 +284,8 @@ public class BulkWriterTest {
|
|
|
// array field
|
|
|
rowObject.add("arr_int32_field", JsonUtils.toJsonTree(GeneratorUtils.generatorInt32Value(random.nextInt(4)))); // not nullable, must provide
|
|
|
// rowObject.add("arr_float_field", null); // nullable, no need to provide
|
|
|
-// rowObject.add("arr_varchar_field", null); // nullable, no need to provide
|
|
|
- } else if (i%3 == 0) {
|
|
|
+ rowObject.add("arr_varchar_field", JsonUtils.toJsonTree(new ArrayList<>())); //empty array
|
|
|
+ } else if (i % 3 == 0) {
|
|
|
// scalar field
|
|
|
rowObject.add("bool_field", null); // nullable, set null is ok
|
|
|
rowObject.add("int8_field", null); // has default value, set null to get default
|
|
|
@@ -281,6 +330,23 @@ public class BulkWriterTest {
|
|
|
rowObject.add("sparse_vector_field", JsonUtils.toJsonTree(utils.generateSparseVector()));
|
|
|
rowObject.add("int8_vector_field", JsonUtils.toJsonTree(utils.generateInt8Vector().array()));
|
|
|
|
|
|
+ // struct field
|
|
|
+ List<JsonObject> structList = new ArrayList<>();
|
|
|
+ for (int k = 0; k < i % 4 + 1; k++) {
|
|
|
+ JsonObject st = new JsonObject();
|
|
|
+ st.addProperty("st_bool", (i + k) % 3 == 0);
|
|
|
+ st.addProperty("st_int8", (i + k) % 128);
|
|
|
+ st.addProperty("st_int16", (i + k) % 16384);
|
|
|
+ st.addProperty("st_int32", (i + k) % 65536);
|
|
|
+ st.addProperty("st_int64", i + k);
|
|
|
+ st.addProperty("st_float", (float) (i + k) / 4);
|
|
|
+ st.addProperty("st_double", (i + k) / 3);
|
|
|
+ st.addProperty("st_string", String.format("dummy_%d", i + k));
|
|
|
+ st.add("st_float_vector", JsonUtils.toJsonTree(utils.generateFloatVector()));
|
|
|
+ structList.add(st);
|
|
|
+ }
|
|
|
+ rowObject.add("struct_field", JsonUtils.toJsonTree(structList));
|
|
|
+
|
|
|
rows.add(rowObject);
|
|
|
}
|
|
|
return rows;
|
|
|
@@ -293,15 +359,15 @@ public class BulkWriterTest {
|
|
|
}
|
|
|
|
|
|
private static JsonElement constructJsonPrimitive(Object obj) {
|
|
|
- if (obj == null ) {
|
|
|
+ if (obj == null) {
|
|
|
return JsonNull.INSTANCE;
|
|
|
} else if (obj instanceof Boolean) {
|
|
|
- return new JsonPrimitive((Boolean)obj);
|
|
|
+ return new JsonPrimitive((Boolean) obj);
|
|
|
} else if (obj instanceof Short || obj instanceof Integer || obj instanceof Long ||
|
|
|
obj instanceof Float || obj instanceof Double) {
|
|
|
return new JsonPrimitive((Number) obj);
|
|
|
} else if (obj instanceof String) {
|
|
|
- return new JsonPrimitive((String)obj);
|
|
|
+ return new JsonPrimitive((String) obj);
|
|
|
}
|
|
|
|
|
|
Assertions.fail("Default value is illegal");
|
|
|
@@ -334,7 +400,7 @@ public class BulkWriterTest {
|
|
|
if (fieldSchemaV2.getDimension() != null) {
|
|
|
Assertions.assertEquals(fieldSchemaV2.getDimension(), fieldSchemaV1.getDimension());
|
|
|
}
|
|
|
- if(fieldSchemaV2.getDataType() == DataType.VarChar) {
|
|
|
+ if (fieldSchemaV2.getDataType() == DataType.VarChar) {
|
|
|
Assertions.assertEquals(fieldSchemaV2.getMaxLength(), fieldSchemaV1.getMaxLength());
|
|
|
}
|
|
|
|
|
|
@@ -360,7 +426,7 @@ public class BulkWriterTest {
|
|
|
.withLocalPath("/tmp/bulk_writer")
|
|
|
.withFileType(fileType)
|
|
|
.build();
|
|
|
- try(LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
|
|
|
+ try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
|
|
|
JsonObject rowObject = new JsonObject();
|
|
|
rowObject.addProperty("bool_field", true);
|
|
|
rowObject.addProperty("int8_field", 1);
|
|
|
@@ -378,22 +444,23 @@ public class BulkWriterTest {
|
|
|
rowObject.add("arr_int32_field", JsonUtils.toJsonTree(GeneratorUtils.generatorInt32Value(2)));
|
|
|
rowObject.add("arr_float_field", JsonUtils.toJsonTree(GeneratorUtils.generatorFloatValue(3)));
|
|
|
rowObject.add("arr_varchar_field", JsonUtils.toJsonTree(GeneratorUtils.generatorVarcharValue(4, 5)));
|
|
|
+ rowObject.add("struct_field", JsonUtils.toJsonTree(new ArrayList<>()));
|
|
|
|
|
|
// a field missed, expect throwing an exception
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// id is auto_id, no need to input, expect throwing an exception
|
|
|
rowObject.addProperty("id", 1);
|
|
|
rowObject.addProperty("int16_field", 2);
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// set null value for non-nullable field, expect throwing an exception
|
|
|
rowObject.remove("id");
|
|
|
rowObject.add("int16_field", null);
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// set valid value for dynamic field
|
|
|
rowObject.addProperty("int16_field", 16);
|
|
|
@@ -405,48 +472,84 @@ public class BulkWriterTest {
|
|
|
// set invalid value for dynamic field, expect throwing an exception
|
|
|
rowObject.addProperty("$meta", 6);
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// set incorrect dimension vector, expect throwing an exception
|
|
|
rowObject.remove("$meta");
|
|
|
- rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION-1)));
|
|
|
+ rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION - 1)));
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// set incorrect sparse vector, expect throwing an exception
|
|
|
rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
|
|
|
rowObject.add("sparse_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// set incorrect value type for scalar field, expect throwing an exception
|
|
|
rowObject.add("sparse_vector_field", JsonUtils.toJsonTree(utils.generateSparseVector()));
|
|
|
rowObject.addProperty("float_field", Boolean.TRUE);
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// set incorrect type for varchar field, expect throwing an exception
|
|
|
rowObject.addProperty("float_field", 2.5);
|
|
|
rowObject.addProperty("varchar_field", 2.5);
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
|
|
|
// set incorrect value type for int8 vector field, expect throwing an exception
|
|
|
rowObject.addProperty("varchar_field", "dummy");
|
|
|
rowObject.addProperty("int8_vector_field", Boolean.TRUE);
|
|
|
// localBulkWriter.appendRow(rowObject);
|
|
|
- Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
|
|
|
+ Assertions.assertThrows(MilvusException.class, () -> localBulkWriter.appendRow(rowObject));
|
|
|
} catch (Exception e) {
|
|
|
Assertions.fail(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static void compareJsonArray(JsonElement j1, JsonElement j2, DataType dt) {
|
|
|
+ if (j1.isJsonNull()) {
|
|
|
+ Assertions.assertTrue(j2.isJsonNull());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Assertions.assertTrue(j1.isJsonArray());
|
|
|
+ Assertions.assertTrue(j2.isJsonArray());
|
|
|
+ List<JsonElement> a1 = j1.getAsJsonArray().asList();
|
|
|
+ List<JsonElement> a2 = j2.getAsJsonArray().asList();
|
|
|
+ Assertions.assertEquals(a1.size(), a2.size());
|
|
|
+ for (int i = 0; i < a1.size(); i++) {
|
|
|
+ switch (dt) {
|
|
|
+ case Bool:
|
|
|
+ Assertions.assertEquals(a1.get(i).getAsBoolean(), a2.get(i).getAsBoolean());
|
|
|
+ break;
|
|
|
+ case Int8:
|
|
|
+ case Int16:
|
|
|
+ case Int32:
|
|
|
+ Assertions.assertEquals(a1.get(i).getAsInt(), a2.get(i).getAsInt());
|
|
|
+ break;
|
|
|
+ case Float:
|
|
|
+ Assertions.assertEquals(a1.get(i).getAsFloat(), a2.get(i).getAsFloat());
|
|
|
+ break;
|
|
|
+ case Double:
|
|
|
+ Assertions.assertEquals(a1.get(i).getAsDouble(), a2.get(i).getAsDouble());
|
|
|
+ break;
|
|
|
+ case VarChar:
|
|
|
+ Assertions.assertEquals(a1.get(i).getAsString(), a2.get(i).getAsString());
|
|
|
+ default:
|
|
|
+ Assertions.assertEquals(a1.get(i), a2.get(i));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
void testWriteJson() {
|
|
|
try {
|
|
|
- boolean autoID = true;
|
|
|
- boolean enableDynamicField = true;
|
|
|
+ final boolean autoID = true;
|
|
|
+ final boolean enableDynamicField = true;
|
|
|
+ final int row_count = 10;
|
|
|
CreateCollectionReq.CollectionSchema schemaV2 = buildV2Schema(enableDynamicField, autoID);
|
|
|
LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
|
|
|
.withCollectionSchema(schemaV2)
|
|
|
@@ -454,7 +557,7 @@ public class BulkWriterTest {
|
|
|
.withFileType(BulkFileType.JSON)
|
|
|
.build();
|
|
|
LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam);
|
|
|
- List<JsonObject> rows = buildData(10, enableDynamicField, autoID);
|
|
|
+ List<JsonObject> rows = buildData(row_count, enableDynamicField, autoID);
|
|
|
writeData(localBulkWriter, rows);
|
|
|
|
|
|
System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
|
|
|
@@ -463,6 +566,66 @@ public class BulkWriterTest {
|
|
|
System.out.println(filePaths);
|
|
|
Assertions.assertEquals(1, filePaths.size());
|
|
|
Assertions.assertEquals(1, filePaths.get(0).size());
|
|
|
+
|
|
|
+ Gson gson = new Gson();
|
|
|
+ Reader reader = Files.newBufferedReader(Paths.get(filePaths.get(0).get(0)));
|
|
|
+ List<JsonObject> lines = gson.fromJson(reader, new TypeToken<List<JsonObject>>() {
|
|
|
+ }.getType());
|
|
|
+ Assertions.assertEquals(row_count, lines.size());
|
|
|
+ for (int i = 0; i < lines.size(); i++) {
|
|
|
+ JsonObject expectedDict = rows.get(i);
|
|
|
+ JsonObject readDict = lines.get(i);
|
|
|
+ Assertions.assertTrue(readDict.has(Constant.DYNAMIC_FIELD_NAME));
|
|
|
+ Assertions.assertTrue(expectedDict.keySet().size() == readDict.keySet().size() ||
|
|
|
+ expectedDict.keySet().size() + 1 == readDict.keySet().size());
|
|
|
+ for (String key : readDict.keySet()) {
|
|
|
+ if (key.equals(Constant.DYNAMIC_FIELD_NAME)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ JsonElement expectVal = expectedDict.get(key);
|
|
|
+ JsonElement readVal = readDict.get(key);
|
|
|
+ if (key.equals("sparse_vector_field")) {
|
|
|
+ JsonObject expectSparse = (JsonObject) expectVal;
|
|
|
+ JsonObject readSparse = (JsonObject) readVal;
|
|
|
+ Assertions.assertEquals(expectSparse.keySet().size(), readSparse.keySet().size());
|
|
|
+ for (String id : expectSparse.keySet()) {
|
|
|
+ Assertions.assertTrue(readSparse.has(id));
|
|
|
+ Assertions.assertEquals(expectSparse.get(id).getAsFloat(), readSparse.get(id).getAsFloat());
|
|
|
+ }
|
|
|
+ } else if (key.equals("float_vector_field") || key.equals("arr_float_field")) {
|
|
|
+ compareJsonArray(expectVal, readVal, DataType.Float);
|
|
|
+ } else if (key.equals("json_field")) {
|
|
|
+ if (expectVal.isJsonNull()) {
|
|
|
+ Assertions.assertTrue(readVal.isJsonNull());
|
|
|
+ } else {
|
|
|
+ String str = expectVal.toString();
|
|
|
+ Assertions.assertEquals(str, readVal.getAsString());
|
|
|
+ }
|
|
|
+ } else if (key.equals("arr_varchar_field")) {
|
|
|
+ compareJsonArray(expectVal, readVal, DataType.VarChar);
|
|
|
+ } else if (key.equals("struct_field")) {
|
|
|
+ JsonArray expectStructs = (JsonArray) expectVal;
|
|
|
+ JsonArray readStructs = (JsonArray) readVal;
|
|
|
+ Assertions.assertEquals(expectStructs.size(), readStructs.size());
|
|
|
+ for (int k = 0; k < expectStructs.size(); k++) {
|
|
|
+ JsonObject expectStruct = (JsonObject) expectStructs.get(k);
|
|
|
+ JsonObject readStruct = (JsonObject) readStructs.get(k);
|
|
|
+ Assertions.assertEquals(expectStruct.keySet().size(), readStruct.keySet().size());
|
|
|
+ for (String id : expectStruct.keySet()) {
|
|
|
+ Assertions.assertTrue(readStruct.has(id));
|
|
|
+ if (id.equals("st_float_vector")) {
|
|
|
+ compareJsonArray(expectStruct.get(id), readStruct.get(id), DataType.Float);
|
|
|
+ } else {
|
|
|
+ Assertions.assertEquals(expectStruct.get(id), readStruct.get(id));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Assertions.assertEquals(expectVal, readVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
Assertions.fail(e.getMessage());
|
|
|
}
|
|
|
@@ -471,18 +634,20 @@ public class BulkWriterTest {
|
|
|
@Test
|
|
|
void testWriteCSV() {
|
|
|
try {
|
|
|
- boolean autoID = true;
|
|
|
- boolean enableDynamicField = true;
|
|
|
+ final boolean autoID = true;
|
|
|
+ final boolean enableDynamicField = true;
|
|
|
+ final int row_count = 10;
|
|
|
+ final String nullKey = "XXX";
|
|
|
CreateCollectionReq.CollectionSchema schemaV2 = buildV2Schema(enableDynamicField, autoID);
|
|
|
LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
|
|
|
.withCollectionSchema(schemaV2)
|
|
|
.withLocalPath("/tmp/bulk_writer")
|
|
|
.withFileType(BulkFileType.CSV)
|
|
|
.withConfig("sep", "|")
|
|
|
- .withConfig("nullkey", "XXX")
|
|
|
+ .withConfig("nullkey", nullKey)
|
|
|
.build();
|
|
|
LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam);
|
|
|
- List<JsonObject> rows = buildData(10, enableDynamicField, autoID);
|
|
|
+ List<JsonObject> rows = buildData(row_count, enableDynamicField, autoID);
|
|
|
writeData(localBulkWriter, rows);
|
|
|
|
|
|
System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
|
|
|
@@ -491,6 +656,75 @@ public class BulkWriterTest {
|
|
|
System.out.println(filePaths);
|
|
|
Assertions.assertEquals(1, filePaths.size());
|
|
|
Assertions.assertEquals(1, filePaths.get(0).size());
|
|
|
+
|
|
|
+ try (BufferedReader br = new BufferedReader(new FileReader(filePaths.get(0).get(0)))) {
|
|
|
+ String line;
|
|
|
+ List<String> header = new ArrayList<>();
|
|
|
+ int num_line = 0;
|
|
|
+ while ((line = br.readLine()) != null) {
|
|
|
+ if (header.isEmpty()) {
|
|
|
+ header = Arrays.asList(line.split("\\|"));
|
|
|
+ Assertions.assertTrue(header.contains(Constant.DYNAMIC_FIELD_NAME));
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ JsonObject expectedRow = rows.get(num_line++);
|
|
|
+ String[] values = line.split("\\|");
|
|
|
+ Assertions.assertTrue(values.length == expectedRow.size() || values.length == expectedRow.size() + 1);
|
|
|
+ for (int i = 0; i < header.size(); i++) {
|
|
|
+ String field = header.get(i);
|
|
|
+ if (field.equals(Constant.DYNAMIC_FIELD_NAME)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Assertions.assertTrue(expectedRow.has(field));
|
|
|
+ JsonElement expectEle = expectedRow.get(field);
|
|
|
+ String readStr = values[i];
|
|
|
+ if (expectEle.isJsonNull()) {
|
|
|
+ Assertions.assertEquals(String.format("\"%s\"", nullKey), readStr);
|
|
|
+ } else if (expectEle.isJsonArray()) {
|
|
|
+ if (field.equals("struct_field")) {
|
|
|
+ Gson gson = new Gson();
|
|
|
+ readStr = readStr.substring(1, readStr.length() - 1);
|
|
|
+ readStr = readStr.replace("\"\"", "\"");
|
|
|
+ List<JsonObject> readStructs = gson.fromJson(readStr, new TypeToken<List<JsonObject>>() {
|
|
|
+ }.getType());
|
|
|
+ List<JsonElement> expectStructs = expectEle.getAsJsonArray().asList();
|
|
|
+ Assertions.assertEquals(expectStructs.size(), readStructs.size());
|
|
|
+ for (int k = 0; k < expectStructs.size(); k++) {
|
|
|
+ JsonObject expectStruct = expectStructs.get(k).getAsJsonObject();
|
|
|
+ JsonObject readStruct = readStructs.get(k);
|
|
|
+ for (String key : expectStruct.keySet()) {
|
|
|
+ Assertions.assertTrue(readStruct.has(key));
|
|
|
+ if (expectStruct.get(key).isJsonArray()) {
|
|
|
+ Assertions.assertTrue(readStruct.get(key).isJsonArray());
|
|
|
+ List<JsonElement> expectVals = expectStruct.get(key).getAsJsonArray().asList();
|
|
|
+ List<JsonElement> readVals = readStruct.get(key).getAsJsonArray().asList();
|
|
|
+ Assertions.assertEquals(expectVals.size(), readVals.size());
|
|
|
+ for (int j = 0; j < expectVals.size(); j++) {
|
|
|
+ Assertions.assertEquals(expectVals.get(j).getAsFloat(), readVals.get(j).getAsFloat());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Assertions.assertEquals(expectStruct.get(key), readStruct.get(key));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ String expectStr = String.format("\"%s\"", expectEle.getAsJsonArray().toString());
|
|
|
+ readStr = readStr.replace(" ", "");
|
|
|
+ readStr = readStr.replace("\"\"", "\"");
|
|
|
+ Assertions.assertEquals(expectStr, readStr);
|
|
|
+ }
|
|
|
+ } else if (expectEle.isJsonObject()) {
|
|
|
+ String expectStr = String.format("\"%s\"", expectEle.getAsJsonObject().toString());
|
|
|
+ expectStr = expectStr.replace("\"", "\"\"");
|
|
|
+ readStr = String.format("\"%s\"", readStr);
|
|
|
+ Assertions.assertEquals(expectStr, readStr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
Assertions.fail(e.getMessage());
|
|
|
}
|
|
|
@@ -501,6 +735,7 @@ public class BulkWriterTest {
|
|
|
String ss2 = s2.replace("\\\"", "\"").replaceAll("^\"|\"$", "");
|
|
|
Assertions.assertEquals(ss1, ss2);
|
|
|
}
|
|
|
+
|
|
|
private static void verifyElement(DataType dtype, JsonElement element, Object obj) {
|
|
|
switch (dtype) {
|
|
|
case Bool:
|
|
|
@@ -524,19 +759,20 @@ public class BulkWriterTest {
|
|
|
case Geometry:
|
|
|
case Timestamptz:
|
|
|
case JSON:
|
|
|
- verifyJsonString(element.getAsString(), ((Utf8)obj).toString());
|
|
|
+ verifyJsonString(element.getAsString(), ((Utf8) obj).toString());
|
|
|
break;
|
|
|
case SparseFloatVector:
|
|
|
- verifyJsonString(element.toString(), ((Utf8)obj).toString());
|
|
|
+ verifyJsonString(element.toString(), ((Utf8) obj).toString());
|
|
|
break;
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void verifyRow(List<CreateCollectionReq.FieldSchema> fieldsList, List<JsonObject> originalData, GenericData.Record readRow) {
|
|
|
- long id = (long)readRow.get("id");
|
|
|
- JsonObject expectedRow = originalData.get((int)id);
|
|
|
+ private static void verifyParquetRow(CreateCollectionReq.CollectionSchema schema, List<JsonObject> originalData, GenericData.Record readRow) {
|
|
|
+ long id = (long) readRow.get("id");
|
|
|
+ JsonObject expectedRow = originalData.get((int) id);
|
|
|
+ List<CreateCollectionReq.FieldSchema> fieldsList = schema.getFieldSchemaList();
|
|
|
for (CreateCollectionReq.FieldSchema field : fieldsList) {
|
|
|
String fieldName = field.getName();
|
|
|
Object readValue = readRow.get(fieldName);
|
|
|
@@ -563,7 +799,7 @@ public class BulkWriterTest {
|
|
|
Assertions.fail("Array field type unmatched");
|
|
|
}
|
|
|
List<JsonElement> jsonArr = expectedEle.getAsJsonArray().asList();
|
|
|
- List<GenericData.Record> objArr = (List<GenericData.Record>)readValue;
|
|
|
+ List<GenericData.Record> objArr = (List<GenericData.Record>) readValue;
|
|
|
if (jsonArr.size() != objArr.size()) {
|
|
|
Assertions.fail("Array field length unmatched");
|
|
|
}
|
|
|
@@ -590,14 +826,80 @@ public class BulkWriterTest {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ List<CreateCollectionReq.StructFieldSchema> structList = schema.getStructFields();
|
|
|
+ for (CreateCollectionReq.StructFieldSchema struct : structList) {
|
|
|
+ String fieldName = struct.getName();
|
|
|
+ Object readValue = readRow.get(fieldName);
|
|
|
+ Assertions.assertInstanceOf(List.class, readValue);
|
|
|
+ List<?> readStructs = (List<?>) readValue;
|
|
|
+ JsonElement expectedEle = expectedRow.get(fieldName);
|
|
|
+ Assertions.assertNotNull(expectedEle);
|
|
|
+ JsonArray jsonArr = expectedEle.getAsJsonArray();
|
|
|
+ Assertions.assertNotNull(jsonArr);
|
|
|
+ List<JsonElement> jsonList = jsonArr.asList();
|
|
|
+ Assertions.assertEquals(jsonList.size(), readStructs.size());
|
|
|
+
|
|
|
+ for (int i = 0; i < jsonList.size(); i++) {
|
|
|
+ JsonObject expectedDict = (JsonObject) jsonList.get(i);
|
|
|
+ GenericData.Record readDict = (GenericData.Record) readStructs.get(i);
|
|
|
+ readDict = (GenericData.Record) readDict.get("element");
|
|
|
+ for (String key : expectedDict.keySet()) {
|
|
|
+ Assertions.assertTrue(readDict.hasField(key));
|
|
|
+ }
|
|
|
+
|
|
|
+ Object expectedVal = expectedDict.get("st_bool").getAsBoolean();
|
|
|
+ Object readVal = readDict.get("st_bool");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+
|
|
|
+ expectedVal = expectedDict.get("st_int8").getAsInt();
|
|
|
+ readVal = readDict.get("st_int8");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+
|
|
|
+ expectedVal = expectedDict.get("st_int16").getAsInt();
|
|
|
+ readVal = readDict.get("st_int16");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+
|
|
|
+ expectedVal = expectedDict.get("st_int32").getAsInt();
|
|
|
+ readVal = readDict.get("st_int32");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+
|
|
|
+ expectedVal = expectedDict.get("st_int64").getAsLong();
|
|
|
+ readVal = readDict.get("st_int64");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+
|
|
|
+ expectedVal = expectedDict.get("st_float").getAsFloat();
|
|
|
+ readVal = readDict.get("st_float");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+
|
|
|
+ expectedVal = expectedDict.get("st_double").getAsDouble();
|
|
|
+ readVal = readDict.get("st_double");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+
|
|
|
+ expectedVal = expectedDict.get("st_string").getAsString();
|
|
|
+ Utf8 utf = (Utf8) readDict.get("st_string");
|
|
|
+ String decodedString = new String(utf.getBytes(), StandardCharsets.UTF_8);
|
|
|
+ Assertions.assertEquals(expectedVal, decodedString);
|
|
|
+
|
|
|
+ List<JsonElement> expectedArr = expectedDict.get("st_float_vector").getAsJsonArray().asList();
|
|
|
+ List<?> readArr = (List<?>) readDict.get("st_float_vector");
|
|
|
+ Assertions.assertEquals(expectedArr.size(), readArr.size());
|
|
|
+ for (int k = 0; k < readArr.size(); k++) {
|
|
|
+ expectedVal = expectedArr.get(k).getAsFloat();
|
|
|
+ readVal = ((GenericData.Record) readArr.get(k)).get("element");
|
|
|
+ Assertions.assertEquals(expectedVal, readVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
System.out.printf("The row of id=%d is correct%n", id);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testWriteParquet() {
|
|
|
// collection schema
|
|
|
- boolean autoID = false;
|
|
|
- boolean enableDynamicField = false;
|
|
|
+ final boolean autoID = false;
|
|
|
+ final boolean enableDynamicField = false;
|
|
|
+ final int rowCount = 10;
|
|
|
CreateCollectionReq.CollectionSchema schemaV2 = buildV2Schema(enableDynamicField, autoID);
|
|
|
|
|
|
// local bulkwriter
|
|
|
@@ -608,18 +910,17 @@ public class BulkWriterTest {
|
|
|
.withChunkSize(100 * 1024)
|
|
|
.build();
|
|
|
|
|
|
- int rowCount = 10;
|
|
|
List<JsonObject> originalData = new ArrayList<>();
|
|
|
List<List<String>> batchFiles = new ArrayList<>();
|
|
|
try (LocalBulkWriter bulkWriter = new LocalBulkWriter(writerParam)) {
|
|
|
- originalData = buildData(10, enableDynamicField, autoID);
|
|
|
+ originalData = buildData(rowCount, enableDynamicField, autoID);
|
|
|
writeData(bulkWriter, originalData);
|
|
|
|
|
|
bulkWriter.commit(false);
|
|
|
List<List<String>> files = bulkWriter.getBatchFiles();
|
|
|
System.out.printf("LocalBulkWriter done! output local files: %s%n", files);
|
|
|
Assertions.assertEquals(1, files.size());
|
|
|
- Assertions.assertEquals(files.get(0).size(), 1);
|
|
|
+ Assertions.assertEquals(1, files.get(0).size());
|
|
|
batchFiles.addAll(files);
|
|
|
} catch (Exception e) {
|
|
|
System.out.println("LocalBulkWriter catch exception: " + e);
|
|
|
@@ -632,12 +933,11 @@ public class BulkWriterTest {
|
|
|
final int[] counter = {0};
|
|
|
for (List<String> files : batchFiles) {
|
|
|
List<JsonObject> finalOriginalData = originalData;
|
|
|
- List<CreateCollectionReq.FieldSchema> fieldsList = schemaV2.getFieldSchemaList();
|
|
|
new ParquetReaderUtils() {
|
|
|
@Override
|
|
|
public void readRecord(GenericData.Record record) {
|
|
|
counter[0]++;
|
|
|
- verifyRow(fieldsList, finalOriginalData, record);
|
|
|
+ verifyParquetRow(schemaV2, finalOriginalData, record);
|
|
|
}
|
|
|
}.readParquet(files.get(0));
|
|
|
}
|