Преглед на файлове

Support Timestamptz field (#1625)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot преди 2 седмици
родител
ревизия
9efac7ec93

+ 2 - 2
docker-compose.yml

@@ -3,7 +3,7 @@ version: '3.5'
 services:
   standalone:
     container_name: milvus-javasdk-standalone-1
-    image: milvusdb/milvus:master-20250922-200ee4cb-amd64
+    image: milvusdb/milvus:master-20250924-20411e52-amd64
     command: [ "milvus", "run", "standalone" ]
     environment:
       - COMMON_STORAGETYPE=local
@@ -24,7 +24,7 @@ services:
 
   standaloneslave:
     container_name: milvus-javasdk-standalone-2
-    image: milvusdb/milvus:master-20250922-200ee4cb-amd64
+    image: milvusdb/milvus:master-20250924-20411e52-amd64
     command: [ "milvus", "run", "standalone" ]
     environment:
       - COMMON_STORAGETYPE=local

+ 2 - 1
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/BulkWriter.java

@@ -294,7 +294,8 @@ public abstract class BulkWriter implements AutoCloseable {
                     rowSize += objectAndSize.getRight();
                     break;
                 }
-                case VarChar: {
+                case VarChar:
+                case Timestamptz: {
                     Pair<Object, Integer> objectAndSize = verifyVarchar(obj, field);
                     rowValues.put(fieldName, objectAndSize.getLeft());
                     rowSize += objectAndSize.getRight();

+ 2 - 0
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/common/utils/ParquetUtils.java

@@ -94,6 +94,7 @@ public class ParquetUtils {
                     setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT64, null, field, false);
                     break;
                 case VarChar:
+                case Timestamptz:
                 case JSON:
                 case SparseFloatVector: // sparse vector is parsed as JSON format string in the server side
                     setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.BINARY,
@@ -136,6 +137,7 @@ public class ParquetUtils {
                 setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT64, null, field, true);
                 break;
             case VarChar:
+            case Timestamptz:
                 setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.BINARY,
                         LogicalTypeAnnotation.stringType(), field, true);
                 break;

+ 2 - 0
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/ParquetFileWriter.java

@@ -135,6 +135,7 @@ public class ParquetFileWriter implements FormatFileWriter {
                 break;
             case VarChar:
             case String:
+            case Timestamptz:
             case JSON:
                 group.append(paramName, (String)value);
                 break;
@@ -169,6 +170,7 @@ public class ParquetFileWriter implements FormatFileWriter {
                         break;
                     case String:
                     case VarChar:
+                    case Timestamptz:
                         addStringArray(group, paramName, (List<String>) value);
                         break;
                     case Bool:

+ 1 - 0
sdk-bulkwriter/src/test/java/io/milvus/bulkwriter/BulkWriterTest.java

@@ -521,6 +521,7 @@ public class BulkWriterTest {
                 Assertions.assertEquals(element.getAsDouble(), obj);
                 break;
             case VarChar:
+            case Timestamptz:
             case JSON:
                 verifyJsonString(element.getAsString(), ((Utf8)obj).toString());
                 break;

+ 2 - 1
sdk-bulkwriter/src/test/java/io/milvus/bulkwriter/TestUtils.java

@@ -150,7 +150,8 @@ public class TestUtils {
                 }
                 return values;
             }
-            case VarChar: {
+            case VarChar:
+            case Timestamptz: {
                 List<String> values = new ArrayList<>();
                 for (int i = 0; i < maxCapacity; i++) {
                     values.add(String.format("varchar_arr_%d", i));

+ 7 - 1
sdk-core/src/main/java/io/milvus/param/ParamUtils.java

@@ -270,6 +270,7 @@ public class ParamUtils {
                 break;
             case VarChar:
             case String:
+            case Timestamptz:
                 for (Object value : values) {
                     if (checkNullableFieldData(fieldSchema, value, verifyElementType)) {
                         continue;
@@ -418,6 +419,7 @@ public class ParamUtils {
                 return value.getAsDouble(); // return double for genFieldData()
             case VarChar:
             case String:
+            case Timestamptz:
                 if (!(value.isJsonPrimitive())) {
                     throw new ParamException(String.format(errMsgs.get(dataType), fieldName));
                 }
@@ -464,6 +466,7 @@ public class ParamUtils {
                 case Double:
                     return JsonUtils.fromJson(jsonArray, new TypeToken<List<Double>>() {}.getType());
                 case VarChar:
+                case Timestamptz:
                     return JsonUtils.fromJson(jsonArray, new TypeToken<List<String>>() {}.getType());
                 default:
                     throw new ParamException(String.format("Unsupported element type of Array field '%s'", fieldName));
@@ -1367,7 +1370,8 @@ public class ParamUtils {
                 return ScalarField.newBuilder().setDoubleData(doubleArray).build();
             }
             case String:
-            case VarChar: {
+            case VarChar:
+            case Timestamptz: {
                 List<String> strings = objects.stream().map(p -> (p == null) ? null : (String) p).collect(Collectors.toList());
                 StringArray stringArray = StringArray.newBuilder().addAllData(strings).build();
                 return ScalarField.newBuilder().setStringData(stringArray).build();
@@ -1505,6 +1509,7 @@ public class ParamUtils {
                 break;
             case VarChar:
             case String:
+            case Timestamptz:
                 if (obj instanceof String) {
                     return builder.setStringData((String) obj).build();
                 }
@@ -1541,6 +1546,7 @@ public class ParamUtils {
                 return value.getBoolData();
             case VarChar:
             case String:
+            case Timestamptz:
                 return value.getStringData();
             case JSON:
                 return JsonUtils.fromJson(value.getStringData(), JsonObject.class);

+ 3 - 0
sdk-core/src/main/java/io/milvus/response/FieldDataWrapper.java

@@ -173,6 +173,7 @@ public class FieldDataWrapper {
                 return fieldData.getScalars().getDoubleData().getDataCount();
             case VarChar:
             case String:
+            case Timestamptz:
                 return fieldData.getScalars().getStringData().getDataCount();
             case JSON:
                 return fieldData.getScalars().getJsonData().getDataCount();
@@ -246,6 +247,7 @@ public class FieldDataWrapper {
             case Double:
             case VarChar:
             case String:
+            case Timestamptz:
             case JSON:
                 return getScalarData(dt, fieldData.getScalars(), fieldData.getValidDataList());
             case ArrayOfStruct:
@@ -341,6 +343,7 @@ public class FieldDataWrapper {
                 return setNoneData(scalar.getDoubleData().getDataList(), validData);
             case VarChar:
             case String:
+            case Timestamptz:
                 ProtocolStringList protoStrList = scalar.getStringData().getDataList();
                 return setNoneData(protoStrList.subList(0, protoStrList.size()), validData);
             case JSON:

+ 1 - 0
sdk-core/src/main/java/io/milvus/v2/common/DataType.java

@@ -39,6 +39,7 @@ public enum DataType {
     VarChar(21), // variable-length strings with a specified maximum length
     Array(22),
     JSON(23),
+    Timestamptz(26),
 
     BinaryVector(100),
     FloatVector(101),

+ 6 - 0
sdk-core/src/main/java/io/milvus/v2/service/vector/request/QueryReq.java

@@ -44,6 +44,12 @@ public class QueryReq {
     private long limit;
     private boolean ignoreGrowing;
 
+    // Extra parameters for query, timezone, time_fields, etc.
+    // Make sure the value can be converted to String by String.valueOf().
+    // For example: {"timezone": "America/Chicago"}
+    @Builder.Default
+    private Map<String, Object> queryParams = new HashMap<>();
+
     // Expression template, to improve expression parsing performance in complicated list
     // Assume user has a filter = "pk > 3 and city in ["beijing", "shanghai", ......]
     // The long list of city will increase the time cost to parse this expression.

+ 10 - 0
sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java

@@ -75,6 +75,16 @@ public class VectorUtils {
             builder.setConsistencyLevelValue(request.getConsistencyLevel().getCode());
         }
 
+        // extra parameters
+        Map<String, Object> queryParams = request.getQueryParams();
+        queryParams.forEach((key, value) -> {
+            // for new versions, all keys are in the top level
+            builder.addQueryParams(KeyValuePair.newBuilder()
+                    .setKey(key)
+                    .setValue(String.valueOf(value))
+                    .build());
+        });
+
         // set offset and limit value.
         // directly pass the two values, the server will verify them.
         long offset = request.getOffset();

+ 1 - 1
sdk-core/src/test/java/io/milvus/TestUtils.java

@@ -11,7 +11,7 @@ public class TestUtils {
     private int dimension = 256;
     private static final Random RANDOM = new Random();
 
-    public static final String MilvusDockerImageID = "milvusdb/milvus:master-20250922-200ee4cb-amd64";
+    public static final String MilvusDockerImageID = "milvusdb/milvus:master-20250924-20411e52-amd64";
 
     public TestUtils(int dimension) {
         this.dimension = dimension;

+ 114 - 0
sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

@@ -1219,6 +1219,120 @@ class MilvusClientV2DockerTest {
         Assertions.assertEquals(9L, (long)searchResults.get(1).get(0).getId());
     }
 
+    @Test
+    void testTimestamp() {
+        String randomCollectionName = generator.generate(10);
+        String pkField = "pk";
+        String vectorField = "vector";
+        String timestampField = "timestamp";
+        CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
+                .build();
+        collectionSchema.addField(AddFieldReq.builder()
+                .fieldName(pkField)
+                .dataType(DataType.Int64)
+                .isPrimaryKey(Boolean.TRUE)
+                .build());
+        collectionSchema.addField(AddFieldReq.builder()
+                .fieldName(vectorField)
+                .dataType(DataType.FloatVector)
+                .dimension(DIMENSION)
+                .build());
+        collectionSchema.addField(AddFieldReq.builder()
+                .fieldName(timestampField)
+                .dataType(DataType.Timestamptz)
+                .build());
+
+        client.dropCollection(DropCollectionReq.builder()
+                .collectionName(randomCollectionName)
+                .build());
+
+        CreateCollectionReq requestCreate = CreateCollectionReq.builder()
+                .collectionName(randomCollectionName)
+                .collectionSchema(collectionSchema)
+                .build();
+        client.createCollection(requestCreate);
+
+        List<IndexParam> indexParams = new ArrayList<>();
+        indexParams.add(IndexParam.builder()
+                .fieldName(vectorField)
+                .indexType(IndexParam.IndexType.HNSW)
+                .metricType(IndexParam.MetricType.COSINE)
+                .build());
+        client.createIndex(CreateIndexReq.builder()
+                .collectionName(randomCollectionName)
+                .indexParams(indexParams)
+                .build());
+        client.loadCollection(LoadCollectionReq.builder()
+                .collectionName(randomCollectionName)
+                .build());
+
+        // describe
+        DescribeCollectionResp descResp = client.describeCollection(DescribeCollectionReq.builder()
+                .collectionName(randomCollectionName)
+                .build());
+        CreateCollectionReq.CollectionSchema descSchema = descResp.getCollectionSchema();
+        List<CreateCollectionReq.FieldSchema> fields = descSchema.getFieldSchemaList();
+        Assertions.assertEquals(collectionSchema.getFieldSchemaList().size(), fields.size());
+        Assertions.assertEquals(timestampField, fields.get(2).getName());
+        Assertions.assertEquals(DataType.Timestamptz, fields.get(2).getDataType());
+
+        // insert
+        List<JsonObject> rows = new ArrayList<>();
+        {
+            JsonObject row = new JsonObject();
+            row.addProperty(pkField, 1);
+            row.addProperty(timestampField, "2025-01-02T00:00:00+08:00"); // Shanghai time
+            row.add(vectorField, JsonUtils.toJsonTree(utils.generateFloatVector()));
+            rows.add(row);
+        }
+        {
+            JsonObject row = new JsonObject();
+            row.addProperty(pkField, 2);
+            row.addProperty(timestampField, "2025-01-02T00:00:00-06:00"); // Chicago time
+            row.add(vectorField, JsonUtils.toJsonTree(utils.generateFloatVector()));
+            rows.add(row);
+        }
+        InsertResp insertResp = client.insert(InsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(rows)
+                .build());
+        Assertions.assertEquals(rows.size(), insertResp.getInsertCnt());
+
+        // query
+        Map<String, Object> params = new HashMap<>();
+//        params.put("timezone", "America/Chicago");
+        QueryResp queryResp = client.query(QueryReq.builder()
+                .collectionName(randomCollectionName)
+                .limit(10)
+                .queryParams(params)
+                .consistencyLevel(ConsistencyLevel.STRONG)
+                .outputFields(Arrays.asList(pkField, timestampField))
+                .build());
+        List<QueryResp.QueryResult> queryResults = queryResp.getQueryResults();
+        Assertions.assertEquals(2, queryResults.size());
+        for (QueryResp.QueryResult res : queryResults) {
+            Assertions.assertTrue(res.getEntity().containsKey(timestampField));
+        }
+
+        // search
+        SearchResp searchResp = client.search(SearchReq.builder()
+                .collectionName(randomCollectionName)
+                .annsField(vectorField)
+                .data(Collections.singletonList(new FloatVec(utils.generateFloatVector())))
+                .limit(10)
+                .searchParams(params)
+                .outputFields(Arrays.asList(pkField, timestampField))
+                .build());
+        List<List<SearchResp.SearchResult>> searchResults = searchResp.getSearchResults();
+        Assertions.assertEquals(1, searchResults.size());
+        for (List<SearchResp.SearchResult> oneResults : searchResults) {
+            Assertions.assertEquals(2, oneResults.size());
+            for (SearchResp.SearchResult res : oneResults) {
+                Assertions.assertTrue(res.getEntity().containsKey(timestampField));
+            }
+        }
+    }
+
     @Test
     void testHybridSearch() {
         String randomCollectionName = generator.generate(10);