Pārlūkot izejas kodu

Modify upsert example (#1648)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 3 mēneši atpakaļ
vecāks
revīzija
e1d032b624
1 mainītis faili ar 182 papildinājumiem un 73 dzēšanām
  1. 182 73
      examples/src/main/java/io/milvus/v2/UpsertExample.java

+ 182 - 73
examples/src/main/java/io/milvus/v2/UpsertExample.java

@@ -50,8 +50,9 @@ public class UpsertExample {
     private static final String COLLECTION_NAME = "java_sdk_example_upsert_v2";
     private static final String ID_FIELD = "pk";
     private static final String VECTOR_FIELD = "vector";
-    private static final String TEXT_FIELD = "text";
-    private static final String NULLABLE_FIELD = "nullable";
+    private static final String TEXT_FIELD = "text_field";
+    private static final String JSON_FIELD = "json_field";
+    private static final String NULLABLE_FIELD = "nullable_field";
     private static final Integer VECTOR_DIM = 4;
 
     private static List<Object> createCollection(boolean autoID) {
@@ -62,6 +63,7 @@ public class UpsertExample {
 
         // Create collection
         CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
+                .enableDynamicField(true)
                 .build();
         collectionSchema.addField(AddFieldReq.builder()
                 .fieldName(ID_FIELD)
@@ -79,6 +81,10 @@ public class UpsertExample {
                 .dataType(DataType.VarChar)
                 .maxLength(100)
                 .build());
+        collectionSchema.addField(AddFieldReq.builder()
+                .fieldName(JSON_FIELD)
+                .dataType(DataType.JSON)
+                .build());
         collectionSchema.addField(AddFieldReq.builder()
                 .fieldName(NULLABLE_FIELD)
                 .dataType(DataType.Int32)
@@ -112,7 +118,12 @@ public class UpsertExample {
             List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
             row.add(VECTOR_FIELD, gson.toJsonTree(vector));
             row.addProperty(TEXT_FIELD, String.format("text_%d", i));
+            JsonObject metadata = new JsonObject();
+            metadata.addProperty("foo", i);
+            metadata.addProperty("bar", i);
+            row.add(JSON_FIELD, metadata);
             row.addProperty(NULLABLE_FIELD, i);
+            row.addProperty("dynamic", String.format("dynamic_%d", i)); // this is dynamic field
             rows.add(row);
         }
         InsertResp resp = client.insert(InsertReq.builder()
@@ -122,94 +133,192 @@ public class UpsertExample {
         return resp.getPrimaryKeys();
     }
 
-    private static void queryWithExpr(String expr) {
+    private static List<QueryResp.QueryResult> queryWithExpr(String expr) {
         QueryResp queryRet = client.query(QueryReq.builder()
                 .collectionName(COLLECTION_NAME)
                 .filter(expr)
-                .outputFields(Arrays.asList(ID_FIELD, VECTOR_FIELD, TEXT_FIELD, NULLABLE_FIELD))
+                .outputFields(Collections.singletonList("*"))
                 .consistencyLevel(ConsistencyLevel.STRONG)
                 .build());
-        System.out.println("\nQuery with expression: " + expr);
+        System.out.println("Query with expression: " + expr);
         List<QueryResp.QueryResult> records = queryRet.getQueryResults();
         for (QueryResp.QueryResult record : records) {
             System.out.println(record.getEntity());
         }
+        return records;
     }
 
-    private static void doUpsert(boolean autoID) {
-        System.out.printf("\n============================= autoID = %s =============================", autoID ? "true" : "false");
-        // If autoID is true, the collection primary key is auto-generated by server
-        List<Object> ids = createCollection(autoID);
+    // update the entire row
+    private static void fullUpsert(Object id) {
+        System.out.println("------------------------------ full upsert ------------------------------");
+        Gson gson = new Gson();
+        // Query before upsert, get the No.2 primary key
+        String filter = String.format("%s == %s", ID_FIELD, id);
+        queryWithExpr(filter);
+
+        // Upsert, update all fields value
+        // If autoID is true, the server will return a new primary key for the updated entity
+        JsonObject row = new JsonObject();
+        row.addProperty(ID_FIELD, (Long)id); // primary key must be input so that it can know which entity to be updated
+        List<Float> vectorUpdated = Arrays.asList(1.0f, 1.0f, 1.0f, 1.0f);
+        row.add(VECTOR_FIELD, gson.toJsonTree(vectorUpdated));
+        String textUpdated = "this field has been updated";
+        row.addProperty(TEXT_FIELD, textUpdated);
+        JsonObject metadata = new JsonObject();
+        metadata.addProperty("updated", "yes");
+        row.add(JSON_FIELD, metadata); // the json field will be overridden
+        row.add(NULLABLE_FIELD, null); // update nullable field to null
+        UpsertResp upsertResp = client.upsert(UpsertReq.builder()
+                .collectionName(COLLECTION_NAME)
+                .data(Collections.singletonList(row))
+                .build());
+        List<Object> newIds = upsertResp.getPrimaryKeys();
+        System.out.printf("\nUpsert done, primary key %s has been updated to %s%n", id, newIds.get(0));
+
+        // Query after upsert, you will see the vector field is [1.0f, 1.0f, 1.0f, 1.0f],
+        // text field is "this field has been updated", nullable field is null
+        filter = String.format("%s == %s", ID_FIELD, newIds.get(0));
+        List<QueryResp.QueryResult> results = queryWithExpr(filter);
+
+        // Verify the result
+        if (results.size() != newIds.size()) {
+            throw new RuntimeException("Incorrect query result for filter: " + filter);
+        }
+        Map<String, Object> entity = results.get(0).getEntity();
+        if (!vectorUpdated.equals(entity.get(VECTOR_FIELD))) {
+            throw new RuntimeException("Vector field is not correctly updated for filter: " + filter);
+        }
+        if (!textUpdated.equals(entity.get(TEXT_FIELD))) {
+            throw new RuntimeException("Text field is not correctly updated for filter: " + filter);
+        }
+        // In full upsert, JSON field is overridden
+        if (!entity.get(JSON_FIELD).equals(metadata)) {
+            throw new RuntimeException("JSON field is not correctly updated for filter: " + filter);
+        }
+        if (entity.get(NULLABLE_FIELD) != null) {
+            throw new RuntimeException("Nullable field is not correctly updated for filter: " + filter);
+        }
+        // Note that we didn't input the dynamic field for full update, so it will treat it as removed
+        if (entity.containsKey("dynamic")) {
+            throw new RuntimeException("Dynamic field is not removed for filter: " + filter);
+        }
+    }
 
+    // update the specified field, other fields will keep old values
+    private static void partialUpsert(List<Object> ids, boolean updateVector) {
+        System.out.printf("\n------------------------------ partial upsert %s ------------------------------%n",
+                updateVector ? "vector" : "scalars");
         Gson gson = new Gson();
-        {
-            // Query before upsert, get the No.2 primary key
-            Long oldID = (Long) ids.get(1);
-            String filter = String.format("%s == %d", ID_FIELD, oldID);
-            queryWithExpr(filter);
-
-            // Upsert, update all fields value
-            // If autoID is true, the server will return a new primary key for the updated entity
+        // Query before upsert
+        String filter = String.format("%s in %s", ID_FIELD, ids);
+        List<QueryResp.QueryResult> oldResults = queryWithExpr(filter);
+
+        // Partial upsert, only update the specified field, other fields will keep old values
+        // If autoID is true, the server will return a new primary key for the updated entity
+        // Note: for the case to do partial upsert for multi entities, it only allows updating
+        // the same fields for all rows.
+        // For example, assume a collection has 2 fields: A and B
+        // it is legal to update the same fields as: client.upsert(data = [ {"A": 1}, {"A": 3}])
+        // it is illegal to update different fields as: client.upsert(data = [ {"A": 1}, {"B": 3}])
+        // Read the doc for more info: https://milvus.io/docs/upsert-entities.md
+        List<Float> vectorUpdated = Arrays.asList(1.0f, 1.0f, 1.0f, 1.0f);
+        String textUpdated = "this row has been partially updated";
+        List<JsonObject> rows = new ArrayList<>();
+        for (Object id : ids) {
             JsonObject row = new JsonObject();
-            row.addProperty(ID_FIELD, oldID);
-            List<Float> vector = Arrays.asList(1.0f, 1.0f, 1.0f, 1.0f);
-            row.add(VECTOR_FIELD, gson.toJsonTree(vector));
-            row.addProperty(TEXT_FIELD, "this field has been updated");
-            row.add(NULLABLE_FIELD, null); // update nullable field to null
-            UpsertResp upsertResp = client.upsert(UpsertReq.builder()
-                    .collectionName(COLLECTION_NAME)
-                    .data(Collections.singletonList(row))
-                    .build());
-            List<Object> newIds = upsertResp.getPrimaryKeys();
-            Long newID = (Long) newIds.get(0);
-            System.out.printf("\nUpsert done, primary key %d has been updated to %d%n", oldID, newID);
-
-            // Query after upsert, you will see the vector field is [1.0f, 1.0f, 1.0f, 1.0f],
-            // text field is "this field has been updated", nullable field is null
-            filter = String.format("%s == %d", ID_FIELD, newID);
-            queryWithExpr(filter);
+            row.addProperty(ID_FIELD, (Long) id); // primary key must be input so that it can know which entity to be updated
+            if (updateVector) {
+                row.add(VECTOR_FIELD, gson.toJsonTree(vectorUpdated));
+            } else {
+                row.addProperty(TEXT_FIELD, textUpdated);
+                row.add(NULLABLE_FIELD, null);
+                JsonObject metadata = new JsonObject();
+                metadata.addProperty("updated", "yes");
+                row.add(JSON_FIELD, metadata); // the json field will be merged in partial upsert
+            }
+            row.addProperty("new_dynamic", "new"); // add a new dynamic field
+            rows.add(row);
         }
 
-        {
-            // Query before upsert, get the No.5 and No.6 primary key
-            Long oldID1 = (Long)ids.get(4);
-            Long oldID2 = (Long)ids.get(5);
-            String filter = String.format("%s in [%d, %d]", ID_FIELD, oldID1, oldID2);
-            queryWithExpr(filter);
-
-            // Partial upsert, only update the specified field, other fields will keep old values
-            // If autoID is true, the server will return a new primary key for the updated entity
-            // Note: for the case to do partial upsert for multi entities, it only allows updating
-            // the same fields for all rows.
-            // For example, assume a collection has 2 fields: A and B
-            // it is legal to update the same fields as: client.upsert(data = [ {"A": 1}, {"A": 3}])
-            // it is illegal to update different fields as: client.upsert(data = [ {"A": 1}, {"B": 3}])
-            // Read the doc for more info: https://milvus.io/docs/upsert-entities.md
-            // Here we update the same field "text" for the two rows.
-            JsonObject row1 = new JsonObject();
-            row1.addProperty(ID_FIELD, oldID1);
-            row1.addProperty(TEXT_FIELD, "this row has been partially updated");
-
-            JsonObject row2 = new JsonObject();
-            row2.addProperty(ID_FIELD, oldID2);
-            row2.addProperty(TEXT_FIELD, "this row has been partially updated");
-
-            UpsertResp upsertResp = client.upsert(UpsertReq.builder()
-                    .collectionName(COLLECTION_NAME)
-                    .data(Arrays.asList(row1, row2))
-                    .partialUpdate(true)
-                    .build());
-            List<Object> newIds = upsertResp.getPrimaryKeys();
-            Long newID1 = (Long) newIds.get(0);
-            Long newID2 = (Long) newIds.get(1);
-            System.out.printf("\nPartial upsert done, primary key %d has been updated to %d, %d has been updated to %d%n",
-                    oldID1, newID1, oldID2, newID2);
-
-            // query after upsert, you will see the text field is "this row has been partially updated"
-            // the other fields keep old values
-            filter = String.format("%s in [%d, %d]", ID_FIELD, newID1, newID2);
-            queryWithExpr(filter);
+        UpsertResp upsertResp = client.upsert(UpsertReq.builder()
+                .collectionName(COLLECTION_NAME)
+                .data(rows)
+                .partialUpdate(true)
+                .build());
+        List<Object> newIds = upsertResp.getPrimaryKeys();
+        System.out.printf("\nPartial upsert done, primary keys %s has been updated to %s%n", ids, newIds);
+
+        // query after upsert, you will see the text field is "this row has been partially updated"
+        // the other fields keep old values
+        filter = String.format("%s in %s", ID_FIELD, newIds);
+        List<QueryResp.QueryResult> results = queryWithExpr(filter);
+
+        // Verify the result
+        if (results.size() != newIds.size()) {
+            throw new RuntimeException("Incorrect query result for filter: " + filter);
         }
+
+        for (int i = 0; i < results.size(); i++) {
+            Map<String, Object> oldEntity = oldResults.get(i).getEntity();
+            Map<String, Object> entity = results.get(i).getEntity();
+            if (updateVector) {
+                // only vector field is updated
+                if (!vectorUpdated.equals(entity.get(VECTOR_FIELD))) {
+                    throw new RuntimeException("Vector field is not correctly updated for filter: " + filter);
+                }
+
+                // the other fields keep old values
+                if (!entity.get(TEXT_FIELD).equals(oldEntity.get(TEXT_FIELD))) {
+                    throw new RuntimeException("Text field should not be updated for filter: " + filter);
+                }
+                if (!entity.get(JSON_FIELD).equals(oldEntity.get(JSON_FIELD))) {
+                    throw new RuntimeException("JSON field should not be updated for filter: " + filter);
+                }
+                if (!entity.get(NULLABLE_FIELD).equals(oldEntity.get(NULLABLE_FIELD))) {
+                    throw new RuntimeException("Nullable field should not be updated for filter: " + filter);
+                }
+            } else {
+                // scalar fields are updated
+                if (!textUpdated.equals(entity.get(TEXT_FIELD))) {
+                    throw new RuntimeException("Text field is not correctly updated for filter: " + filter);
+                }
+                if (entity.get(NULLABLE_FIELD) != null) {
+                    throw new RuntimeException("Nullable field is not correctly updated for filter: " + filter);
+                }
+                JsonObject newJson = (JsonObject)entity.get(JSON_FIELD);
+                if (!newJson.has("updated") && !newJson.get("updated").equals("yes")) {
+                    throw new RuntimeException("JSON field is not correctly updated for filter: " + filter);
+                }
+
+                // vector field keep old value
+                if (!entity.get(VECTOR_FIELD).equals(oldEntity.get(VECTOR_FIELD))) {
+                    throw new RuntimeException("Vector field should not be updated for filter: " + filter);
+                }
+            }
+            // Note that we didn't input the dynamic field for partial update, it will keep old value
+            if (!entity.get("dynamic").equals(oldEntity.get("dynamic"))) {
+                throw new RuntimeException("Dynamic field should not be updated for filter: " + filter);
+            }
+            // Verify the new dynamic field is merged
+            if (!entity.containsKey("new_dynamic") && !entity.get("new_dynamic").equals("new")) {
+                throw new RuntimeException("New dynamic field is not merged for filter: " + filter);
+            }
+        }
+    }
+
+    private static void doUpsert(boolean autoID) {
+        System.out.printf("\n================================== autoID = %s ==================================", autoID ? "true" : "false");
+        // If autoID is true, the collection primary key is auto-generated by server
+        List<Object> ids = createCollection(autoID);
+
+        // Update the entire row of the No.2 entity
+        fullUpsert((Long)ids.get(1));
+
+        // Partially update the vectors of No.5 and No.6 entities
+        partialUpsert(ids.subList(4, 6), true);
+
+        // Partially update the scalar fields of No.10 entity
+        partialUpsert(ids.subList(9, 10), false);
     }
 
     public static void main(String[] args) {