瀏覽代碼

Support Session consistency level for V1 and V2 (#1129)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 6 月之前
父節點
當前提交
65dab55cff

+ 235 - 0
examples/main/java/io/milvus/v1/ConsistencyLevelExample.java

@@ -0,0 +1,235 @@
+package io.milvus.v1;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import io.milvus.client.MilvusClient;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.grpc.MutationResult;
+import io.milvus.grpc.SearchResults;
+import io.milvus.param.*;
+import io.milvus.param.collection.*;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.SearchParam;
+import io.milvus.param.index.CreateIndexParam;
+import io.milvus.pool.MilvusClientV1Pool;
+import io.milvus.pool.PoolConfig;
+import io.milvus.response.DescCollResponseWrapper;
+import io.milvus.response.SearchResultsWrapper;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ConsistencyLevelExample {
+    private static final MilvusClient milvusClient;
+
+    static {
+        ConnectParam connectParam = ConnectParam.newBuilder()
+                .withHost("localhost")
+                .withPort(19530)
+                .build();
+        milvusClient = new MilvusServiceClient(connectParam);
+    }
+
+    private static final String COLLECTION_NAME_PREFIX = "java_sdk_example_clevel_v1_";
+    private static final Integer VECTOR_DIM = 512;
+
+    private static String createCollection(ConsistencyLevelEnum level) {
+        String collectionName = COLLECTION_NAME_PREFIX + level.getName();
+
+        // Drop collection if exists
+        milvusClient.dropCollection(DropCollectionParam.newBuilder()
+                .withCollectionName(collectionName)
+                .build());
+
+        // Quickly create a collection with "id" field and "vector" field
+        List<FieldType> fieldsSchema = Arrays.asList(
+                FieldType.newBuilder()
+                        .withName("id")
+                        .withDataType(DataType.Int64)
+                        .withPrimaryKey(true)
+                        .withAutoID(false)
+                        .build(),
+                FieldType.newBuilder()
+                        .withName("vector")
+                        .withDataType(DataType.FloatVector)
+                        .withDimension(VECTOR_DIM)
+                        .build()
+        );
+
+        // Create the collection with 3 fields
+        R<RpcStatus> response = milvusClient.createCollection(CreateCollectionParam.newBuilder()
+                .withCollectionName(collectionName)
+                .withFieldTypes(fieldsSchema)
+                .withConsistencyLevel(level)
+                .build());
+        CommonUtils.handleResponseStatus(response);
+        System.out.printf("Collection '%s' created\n", collectionName);
+
+        response = milvusClient.createIndex(CreateIndexParam.newBuilder()
+                .withCollectionName(collectionName)
+                .withFieldName("vector")
+                .withIndexType(IndexType.FLAT)
+                .withMetricType(MetricType.L2)
+                .build());
+        CommonUtils.handleResponseStatus(response);
+
+        milvusClient.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(collectionName)
+                .build());
+
+        return collectionName;
+    }
+
+    private static void showCollectionLevel(String collectionName) {
+        R<DescribeCollectionResponse> response = milvusClient.describeCollection(DescribeCollectionParam.newBuilder()
+                .withCollectionName(collectionName)
+                .build());
+        CommonUtils.handleResponseStatus(response);
+        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(response.getData());
+        System.out.printf("Default consistency level: %s\n", wrapper.getConsistencyLevel().getName());
+    }
+
+    private static int insertData(String collectionName) {
+        Gson gson = new Gson();
+        int rowCount = 1000;
+        for (int i = 0; i < rowCount; i++) {
+            JsonObject row = new JsonObject();
+            row.addProperty("id", i);
+            row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(VECTOR_DIM)));
+
+            R<MutationResult> response = milvusClient.insert(InsertParam.newBuilder()
+                    .withCollectionName(collectionName)
+                    .withRows(Collections.singletonList(row))
+                    .build());
+            CommonUtils.handleResponseStatus(response);
+        }
+
+        System.out.printf("%d rows inserted\n", rowCount);
+        return rowCount;
+    }
+
+    private static List<SearchResultsWrapper.IDScore> search(String collectionName, int topK) {
+        R<SearchResults> searchR = milvusClient.search(SearchParam.newBuilder()
+                .withCollectionName(collectionName)
+                .withVectorFieldName("vector")
+                .withFloatVectors(Collections.singletonList(CommonUtils.generateFloatVector(VECTOR_DIM)))
+                .withTopK(topK)
+                .withMetricType(MetricType.L2)
+                .build());
+        CommonUtils.handleResponseStatus(searchR);
+
+        SearchResultsWrapper resultsWrapper = new SearchResultsWrapper(searchR.getData().getResults());
+        return resultsWrapper.getIDScore(0);
+    }
+
+    private static void testStrongLevel() {
+        String collectionName = createCollection(ConsistencyLevelEnum.STRONG);
+        showCollectionLevel(collectionName);
+        int rowCount = insertData(collectionName);
+
+        // immediately search after insert, for Strong level, all the entities are visible
+        List<SearchResultsWrapper.IDScore> scores = search(collectionName, rowCount);
+        if (scores.size() != rowCount) {
+            throw new RuntimeException(String.format("All inserted entities should be visible with Strong" +
+                    " consistency level, but only %d returned", scores.size()));
+        }
+        System.out.printf("Strong level is working fine, %d results returned\n", scores.size());
+    }
+
+    private static void testSessionLevel() throws ClassNotFoundException, NoSuchMethodException {
+        String collectionName = createCollection(ConsistencyLevelEnum.SESSION);
+        showCollectionLevel(collectionName);
+
+        ConnectParam connectConfig = ConnectParam.newBuilder()
+                .withHost("localhost")
+                .withPort(19530)
+                .build();
+        PoolConfig poolConfig = PoolConfig.builder()
+                .maxIdlePerKey(10) // max idle clients per key
+                .maxTotalPerKey(20) // max total(idle + active) clients per key
+                .maxTotal(100) // max total clients for all keys
+                .maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
+                .minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
+                .build();
+        MilvusClientV1Pool pool = new MilvusClientV1Pool(poolConfig, connectConfig);
+
+        // The same process, different MilvusClient object, insert and search with Session level.
+        // The Session level ensure that the newly inserted data instantaneously become searchable.
+        Gson gson = new Gson();
+        for (int i = 0; i < 100; i++) {
+            List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
+            JsonObject row = new JsonObject();
+            row.addProperty("id", i);
+            row.add("vector", gson.toJsonTree(vector));
+
+            // insert by a MilvusClient
+            String clientName1 = String.format("client_%d", i%10);
+            MilvusClient client1 = pool.getClient(clientName1);
+            client1.insert(InsertParam.newBuilder()
+                    .withCollectionName(collectionName)
+                    .withRows(Collections.singletonList(row))
+                    .build());
+            pool.returnClient(clientName1, client1); // don't forget to return the client to pool
+            System.out.println("insert");
+
+            // search by another MilvusClient, use the just inserted vector to search
+            // the returned item is expected to be the just inserted item
+            String clientName2 = String.format("client_%d", i%10+1);
+            MilvusClient client2 = pool.getClient(clientName2);
+            R<SearchResults> searchR = client2.search(SearchParam.newBuilder()
+                    .withCollectionName(collectionName)
+                    .withVectorFieldName("vector")
+                    .withFloatVectors(Collections.singletonList(vector))
+                    .withTopK(1)
+                    .withMetricType(MetricType.L2)
+                    .build());
+            pool.returnClient(clientName2, client2); // don't forget to return the client to pool
+            SearchResultsWrapper resultsWrapper = new SearchResultsWrapper(searchR.getData().getResults());
+            List<SearchResultsWrapper.IDScore> scores = resultsWrapper.getIDScore(0);
+            if (scores.size() != 1) {
+                throw new RuntimeException("Search result is empty");
+            }
+            if (i != scores.get(0).getLongID()) {
+                throw new RuntimeException("The just inserted entity is not found");
+            }
+            System.out.println("search");
+        }
+
+        System.out.println("Session level is working fine");
+    }
+
+    private static void testBoundedLevel() {
+        String collectionName = createCollection(ConsistencyLevelEnum.BOUNDED);
+        showCollectionLevel(collectionName);
+        int rowCount = insertData(collectionName);
+
+        // immediately search after insert, for Bounded level, not all the entities are visible
+        List<SearchResultsWrapper.IDScore> scores = search(collectionName, rowCount);
+        System.out.printf("Bounded level is working fine, %d results returned\n", scores.size());
+    }
+
+    private static void testEventuallyLevel() {
+        String collectionName = createCollection(ConsistencyLevelEnum.EVENTUALLY);
+        showCollectionLevel(collectionName);
+        int rowCount = insertData(collectionName);
+
+        // immediately search after insert, for Bounded level, not all the entities are visible
+        List<SearchResultsWrapper.IDScore> scores = search(collectionName, rowCount);
+        System.out.printf("Eventually level is working fine, %d results returned\n", scores.size());
+    }
+
+    public static void main(String[] args) throws Exception {
+        testStrongLevel();
+        System.out.println("==============================================================");
+        testSessionLevel();
+        System.out.println("==============================================================");
+        testBoundedLevel();
+        System.out.println("==============================================================");
+        testEventuallyLevel();
+    }
+}

+ 192 - 0
examples/main/java/io/milvus/v2/ConsistencyLevelExample.java

@@ -0,0 +1,192 @@
+package io.milvus.v2;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import io.milvus.pool.MilvusClientV2Pool;
+import io.milvus.pool.PoolConfig;
+import io.milvus.v1.CommonUtils;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.common.ConsistencyLevel;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.collection.request.DescribeCollectionReq;
+import io.milvus.v2.service.collection.request.DropCollectionReq;
+import io.milvus.v2.service.collection.response.DescribeCollectionResp;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.service.vector.request.SearchReq;
+import io.milvus.v2.service.vector.request.data.FloatVec;
+import io.milvus.v2.service.vector.response.SearchResp;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+public class ConsistencyLevelExample {
+    private static final MilvusClientV2 client;
+
+    static {
+        ConnectConfig config = ConnectConfig.builder()
+                .uri("http://localhost:19530")
+                .build();
+        client = new MilvusClientV2(config);
+    }
+
+    private static final String COLLECTION_NAME_PREFIX = "java_sdk_example_clevel_v2_";
+    private static final Integer VECTOR_DIM = 512;
+
+    private static String createCollection(ConsistencyLevel level) {
+        String collectionName = COLLECTION_NAME_PREFIX + level.getName();
+
+        // Drop collection if exists
+        client.dropCollection(DropCollectionReq.builder()
+                .collectionName(collectionName)
+                .build());
+
+        // Quickly create a collection with "id" field and "vector" field
+        client.createCollection(CreateCollectionReq.builder()
+                .collectionName(collectionName)
+                .dimension(VECTOR_DIM)
+                .consistencyLevel(level)
+                .build());
+        System.out.printf("Collection '%s' created\n", collectionName);
+        return collectionName;
+    }
+
+    private static void showCollectionLevel(String collectionName) {
+        DescribeCollectionResp resp = client.describeCollection(DescribeCollectionReq.builder()
+                .collectionName(collectionName)
+                .build());
+        System.out.printf("Default consistency level: %s\n", resp.getConsistencyLevel().getName());
+    }
+
+    private static int insertData(String collectionName) {
+        Gson gson = new Gson();
+        int rowCount = 1000;
+        for (int i = 0; i < rowCount; i++) {
+            JsonObject row = new JsonObject();
+            row.addProperty("id", i);
+            row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(VECTOR_DIM)));
+
+            client.insert(InsertReq.builder()
+                    .collectionName(collectionName)
+                    .data(Collections.singletonList(row))
+                    .build());
+        }
+
+        System.out.printf("%d rows inserted\n", rowCount);
+        return rowCount;
+    }
+
+    private static List<SearchResp.SearchResult> search(String collectionName, int topK) {
+        SearchResp searchR = client.search(SearchReq.builder()
+                .collectionName(collectionName)
+                .data(Collections.singletonList(new FloatVec(CommonUtils.generateFloatVector(VECTOR_DIM))))
+                .topK(topK)
+                .build());
+        List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
+        return searchResults.get(0);
+    }
+
+    private static void testStrongLevel() {
+        String collectionName = createCollection(ConsistencyLevel.STRONG);
+        showCollectionLevel(collectionName);
+        int rowCount = insertData(collectionName);
+
+        // immediately search after insert, for Strong level, all the entities are visible
+        List<SearchResp.SearchResult> results = search(collectionName, rowCount);
+        if (results.size() != rowCount) {
+            throw new RuntimeException(String.format("All inserted entities should be visible with Strong" +
+                    " consistency level, but only %d returned", results.size()));
+        }
+        System.out.printf("Strong level is working fine, %d results returned\n", results.size());
+    }
+
+    private static void testSessionLevel() throws Exception {
+        String collectionName = createCollection(ConsistencyLevel.SESSION);
+        showCollectionLevel(collectionName);
+
+        ConnectConfig connectConfig = ConnectConfig.builder()
+                .uri("http://localhost:19530")
+                .build();
+        PoolConfig poolConfig = PoolConfig.builder()
+                .maxIdlePerKey(10) // max idle clients per key
+                .maxTotalPerKey(20) // max total(idle + active) clients per key
+                .maxTotal(100) // max total clients for all keys
+                .maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
+                .minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
+                .build();
+        MilvusClientV2Pool pool = new MilvusClientV2Pool(poolConfig, connectConfig);
+
+        // The same process, different MilvusClient object, insert and search with Session level.
+        // The Session level ensure that the newly inserted data instantaneously become searchable.
+        Gson gson = new Gson();
+        for (int i = 0; i < 100; i++) {
+            List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
+            JsonObject row = new JsonObject();
+            row.addProperty("id", i);
+            row.add("vector", gson.toJsonTree(vector));
+
+            // insert by a MilvusClient
+            String clientName1 = String.format("client_%d", i%10);
+            MilvusClientV2 client1 = pool.getClient(clientName1);
+            client1.insert(InsertReq.builder()
+                    .collectionName(collectionName)
+                    .data(Collections.singletonList(row))
+                    .build());
+            pool.returnClient(clientName1, client1); // don't forget to return the client to pool
+            System.out.println("insert");
+
+            // search by another MilvusClient, use the just inserted vector to search
+            // the returned item is expected to be the just inserted item
+            String clientName2 = String.format("client_%d", i%10+1);
+            MilvusClientV2 client2 = pool.getClient(clientName2);
+            SearchResp searchR = client2.search(SearchReq.builder()
+                    .collectionName(collectionName)
+                    .data(Collections.singletonList(new FloatVec(vector)))
+                    .topK(1)
+                    .build());
+            pool.returnClient(clientName2, client2); // don't forget to return the client to pool
+            List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
+            List<SearchResp.SearchResult> results = searchResults.get(0);
+            if (results.size() != 1) {
+                throw new RuntimeException("Search result is empty");
+            }
+            if (i != (Long)results.get(0).getId()) {
+                throw new RuntimeException("The just inserted entity is not found");
+            }
+            System.out.println("search");
+        }
+
+        System.out.println("Session level is working fine");
+    }
+
+    private static void testBoundedLevel() {
+        String collectionName = createCollection(ConsistencyLevel.BOUNDED);
+        showCollectionLevel(collectionName);
+        int rowCount = insertData(collectionName);
+
+        // immediately search after insert, for Bounded level, not all the entities are visible
+        List<SearchResp.SearchResult> results = search(collectionName, rowCount);
+        System.out.printf("Bounded level is working fine, %d results returned\n", results.size());
+    }
+
+    private static void testEventuallyLevel() {
+        String collectionName = createCollection(ConsistencyLevel.EVENTUALLY);
+        showCollectionLevel(collectionName);
+        int rowCount = insertData(collectionName);
+
+        // immediately search after insert, for Bounded level, not all the entities are visible
+        List<SearchResp.SearchResult> results = search(collectionName, rowCount);
+        System.out.printf("Eventually level is working fine, %d results returned\n", results.size());
+    }
+
+    public static void main(String[] args) throws Exception {
+        testStrongLevel();
+        System.out.println("==============================================================");
+        testSessionLevel();
+        System.out.println("==============================================================");
+        testBoundedLevel();
+        System.out.println("==============================================================");
+        testEventuallyLevel();
+    }
+}

+ 6 - 0
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -22,6 +22,7 @@ package io.milvus.client;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
 import io.grpc.StatusRuntimeException;
+import io.milvus.common.utils.GTsDict;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.common.utils.VectorUtils;
 import io.milvus.exception.*;
@@ -1554,6 +1555,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
 
             MutationResult response = blockingStub().delete(builder.build());
             handleResponse(title, response.getStatus());
+            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
             return R.success(response);
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1581,6 +1583,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest());
             cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
             handleResponse(title, response.getStatus());
+            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
             return R.success(response);
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1616,6 +1619,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
+                            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
                         } else {
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                         }
@@ -1658,6 +1662,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest());
             cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
             handleResponse(title, response.getStatus());
+            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
             return R.success(response);
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1692,6 +1697,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
+                            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
                         } else {
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                         }

+ 1 - 2
src/main/java/io/milvus/common/clientenum/ConsistencyLevelEnum.java

@@ -24,8 +24,7 @@ import lombok.Getter;
 public enum ConsistencyLevelEnum {
 
     STRONG("Strong", 0),
-    // Session level is not allowed here because no ORM is implemented
-//    SESSION("Session", 1),
+    SESSION("Session", 1),
     BOUNDED("Bounded", 2),
     EVENTUALLY("Eventually",3),
     ;

+ 52 - 0
src/main/java/io/milvus/common/utils/GTsDict.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.common.utils;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class GTsDict {
+    // GTsDict stores the last write timestamp for ConsistencyLevel.Session
+    // It is a Map<String, Long>, key is the name of a collection, value is the last write timestamp of the collection.
+    // It only takes effect when consistency level is Session.
+    // For each dml action, the GTsDict is updated, the last write timestamp is returned from server-side.
+    // When search/query/hybridSearch is called, and the consistency level is Session, the ts of the collection will
+    // be passed to construct a guarantee_ts to the server.
+    private final static GTsDict TS_DICT = new GTsDict();
+
+    private GTsDict(){}
+
+    public static GTsDict getInstance() {
+        return TS_DICT;
+    }
+
+    private ConcurrentMap<String, Long> tsDict = new ConcurrentHashMap<>();
+
+    public void updateCollectionTs(String collectionName, long ts) {
+        // If the collection name exists, use its value to compare to the input ts,
+        // only when the input ts is larger than the existing value, replace it with the input ts.
+        // If the collection name doesn't exist, directly set the input value.
+        tsDict.compute(collectionName, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value));
+    }
+
+    public Long getCollectionTs(String collectionName) {
+        return tsDict.get(collectionName);
+    }
+}

+ 17 - 16
src/main/java/io/milvus/param/ParamUtils.java

@@ -23,6 +23,7 @@ import com.google.gson.*;
 import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.ByteString;
 import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.common.utils.GTsDict;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
@@ -855,9 +856,8 @@ public class ParamUtils {
             builder.setDsl(requestParam.getExpr());
         }
 
-        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(),
-                requestParam.getGuaranteeTimestamp(), requestParam.getGracefulTime());
-        builder.setTravelTimestamp(requestParam.getTravelTimestamp());
+        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName());
+        builder.setTravelTimestamp(requestParam.getTravelTimestamp()); // deprecated
         builder.setGuaranteeTimestamp(guaranteeTimestamp);
 
         // a new parameter from v2.2.9, if user didn't specify consistency level, set this parameter to true
@@ -952,6 +952,9 @@ public class ParamUtils {
             requestParam.getOutFields().forEach(builder::addOutputFields);
         }
 
+        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName());
+        builder.setGuaranteeTimestamp(guaranteeTimestamp);
+
         if (requestParam.getConsistencyLevel() == null) {
             builder.setUseDefaultConsistency(true);
         } else {
@@ -962,8 +965,7 @@ public class ParamUtils {
     }
 
     public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) {
-        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(),
-                requestParam.getGuaranteeTimestamp(), requestParam.getGracefulTime());
+        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName());
         QueryRequest.Builder builder = QueryRequest.newBuilder()
                 .setCollectionName(requestParam.getCollectionName())
                 .addAllPartitionNames(requestParam.getPartitionNames())
@@ -1022,23 +1024,22 @@ public class ParamUtils {
         return builder.build();
     }
 
-    private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel,
-                                              long guaranteeTimestamp, Long gracefulTime){
+    private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String collectionName){
         if(consistencyLevel == null){
-            return 1L;
+            Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
+            return  (ts == null) ? 1L : ts;
         }
         switch (consistencyLevel){
             case STRONG:
-                guaranteeTimestamp = 0L;
-                break;
+                return 0L;
+            case SESSION:
+                Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
+                return (ts == null) ? 1L : ts;
             case BOUNDED:
-                guaranteeTimestamp = (new Date()).getTime() - gracefulTime;
-                break;
-            case EVENTUALLY:
-                guaranteeTimestamp = 1L;
-                break;
+                return 2L; // let server side to determine the bounded time
+            default:
+                return 1L; // EVENTUALLY and others
         }
-        return guaranteeTimestamp;
     }
 
     public static boolean isVectorDataType(DataType dataType) {

+ 3 - 3
src/main/java/io/milvus/param/dml/QueryIteratorParam.java

@@ -46,9 +46,9 @@ public class QueryIteratorParam {
     private final List<String> partitionNames;
     private final List<String> outFields;
     private final String expr;
-    private final long travelTimestamp;
-    private final long guaranteeTimestamp;
-    private final long gracefulTime;
+    private final long travelTimestamp; // deprecated
+    private final long guaranteeTimestamp; // deprecated
+    private final long gracefulTime; // deprecated
     private final ConsistencyLevelEnum consistencyLevel;
     private final long offset;
     private final long limit;

+ 3 - 3
src/main/java/io/milvus/param/dml/QueryParam.java

@@ -82,9 +82,9 @@ public class QueryParam {
         private final List<String> partitionNames = Lists.newArrayList();
         private final List<String> outFields = new ArrayList<>();
         private String expr = "";
-        private Long travelTimestamp = 0L;
-        private Long gracefulTime = 5000L;
-        private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS;
+        private Long travelTimestamp = 0L; // deprecated
+        private Long gracefulTime = 5000L; // deprecated
+        private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS; // deprecated
         private ConsistencyLevelEnum consistencyLevel = null;
         private Long offset = 0L;
         private Long limit = 0L;

+ 3 - 3
src/main/java/io/milvus/param/dml/SearchIteratorParam.java

@@ -55,9 +55,9 @@ public class SearchIteratorParam {
     private final Long NQ;
     private final int roundDecimal;
     private final String params;
-    private final long travelTimestamp;
-    private final long guaranteeTimestamp;
-    private final Long gracefulTime;
+    private final long travelTimestamp; // deprecated
+    private final long guaranteeTimestamp; // deprecated
+    private final Long gracefulTime; // deprecated
     private final ConsistencyLevelEnum consistencyLevel;
     private final boolean ignoreGrowing;
     private final String groupByFieldName;

+ 3 - 3
src/main/java/io/milvus/param/dml/SearchParam.java

@@ -102,9 +102,9 @@ public class SearchParam {
         private Long NQ;
         private Integer roundDecimal = -1;
         private String params = "{}";
-        private Long travelTimestamp = 0L;
-        private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS;
-        private Long gracefulTime = 5000L;
+        private Long travelTimestamp = 0L; // deprecated
+        private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS; // deprecated
+        private Long gracefulTime = 5000L; // deprecated
         private ConsistencyLevelEnum consistencyLevel = null;
         private Boolean ignoreGrowing = Boolean.FALSE;
         private String groupByFieldName;

+ 1 - 0
src/main/java/io/milvus/v2/common/ConsistencyLevel.java

@@ -23,6 +23,7 @@ import lombok.Getter;
 @Getter
 public enum ConsistencyLevel{
     STRONG("Strong", 0),
+    SESSION("Session", 1),
     BOUNDED("Bounded", 2),
     EVENTUALLY("Eventually",3),
     ;

+ 4 - 0
src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -19,6 +19,7 @@
 
 package io.milvus.v2.service.vector;
 
+import io.milvus.common.utils.GTsDict;
 import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
 import io.milvus.orm.iterator.*;
@@ -108,6 +109,7 @@ public class VectorService extends BaseService {
         MutationResult response = blockingStub.insert(requestBuilder.convertGrpcInsertRequest(request, new DescCollResponseWrapper(descResp)));
         cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
         rpcUtils.handleResponse(title, response.getStatus());
+        GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
         return InsertResp.builder()
                 .InsertCnt(response.getInsertCnt())
                 .build();
@@ -122,6 +124,7 @@ public class VectorService extends BaseService {
         MutationResult response = blockingStub.upsert(requestBuilder.convertGrpcUpsertRequest(request, new DescCollResponseWrapper(descResp)));
         cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
         rpcUtils.handleResponse(title, response.getStatus());
+        GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
         return UpsertResp.builder()
                 .upsertCnt(response.getInsertCnt())
                 .build();
@@ -216,6 +219,7 @@ public class VectorService extends BaseService {
                 .build();
         MutationResult response = blockingStub.delete(deleteRequest);
         rpcUtils.handleResponse(title, response.getStatus());
+        GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
         return DeleteResp.builder()
                 .deleteCnt(response.getDeleteCnt())
                 .build();

+ 2 - 1
src/main/java/io/milvus/v2/service/vector/request/HybridSearchReq.java

@@ -40,5 +40,6 @@ public class HybridSearchReq
     private List<String> outFields;
     @Builder.Default
     private int roundDecimal = -1;
-    private ConsistencyLevel consistencyLevel;
+    @Builder.Default
+    private ConsistencyLevel consistencyLevel = null;
 }

+ 1 - 1
src/main/java/io/milvus/v2/service/vector/request/QueryIteratorReq.java

@@ -20,7 +20,7 @@ public class QueryIteratorReq {
     @Builder.Default
     private String expr = "";
     @Builder.Default
-    private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED;
+    private ConsistencyLevel consistencyLevel = null;
     @Builder.Default
     private long offset = 0;
     @Builder.Default

+ 1 - 1
src/main/java/io/milvus/v2/service/vector/request/QueryReq.java

@@ -39,7 +39,7 @@ public class QueryReq {
     private List<Object> ids;
     private String filter;
     @Builder.Default
-    private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED;
+    private ConsistencyLevel consistencyLevel = null;
     private long offset;
     private long limit;
 }

+ 1 - 1
src/main/java/io/milvus/v2/service/vector/request/SearchIteratorReq.java

@@ -33,7 +33,7 @@ public class SearchIteratorReq {
     @Builder.Default
     private String params = "{}";
     @Builder.Default
-    private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED;
+    private ConsistencyLevel consistencyLevel = null;
     @Builder.Default
     private boolean ignoreGrowing = false;
     @Builder.Default

+ 3 - 3
src/main/java/io/milvus/v2/service/vector/request/SearchReq.java

@@ -50,11 +50,11 @@ public class SearchReq {
     private int roundDecimal = -1;
     @Builder.Default
     private Map<String, Object> searchParams = new HashMap<>();
-    private long guaranteeTimestamp;
+    private long guaranteeTimestamp; // deprecated
     @Builder.Default
-    private Long gracefulTime = 5000L;
+    private Long gracefulTime = 5000L; // deprecated
     @Builder.Default
-    private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED;
+    private ConsistencyLevel consistencyLevel = null;
     private boolean ignoreGrowing;
     private String groupByFieldName;
 }

+ 12 - 14
src/main/java/io/milvus/v2/utils/VectorUtils.java

@@ -20,6 +20,7 @@
 package io.milvus.v2.utils;
 
 import com.google.protobuf.ByteString;
+import io.milvus.common.utils.GTsDict;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.v2.common.ConsistencyLevel;
 import io.milvus.exception.ParamException;
@@ -79,23 +80,22 @@ public class VectorUtils {
 
     }
 
-    private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel,
-                                              long guaranteeTimestamp, Long gracefulTime){
+    private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String collectionName){
         if(consistencyLevel == null){
-            return 1L;
+            Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
+            return  (ts == null) ? 1L : ts;
         }
         switch (consistencyLevel){
             case STRONG:
-                guaranteeTimestamp = 0L;
-                break;
+                return 0L;
+            case SESSION:
+                Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
+                return  (ts == null) ? 1L : ts;
             case BOUNDED:
-                guaranteeTimestamp = (new Date()).getTime() - gracefulTime;
-                break;
-            case EVENTUALLY:
-                guaranteeTimestamp = 1L;
-                break;
+                return 2L; // let server side to determine the bounded time
+            default:
+                return 1L; // EVENTUALLY and others
         }
-        return guaranteeTimestamp;
     }
 
     public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) {
@@ -182,9 +182,7 @@ public class VectorUtils {
             builder.setDsl(request.getFilter());
         }
 
-        long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(),
-                request.getGuaranteeTimestamp(), request.getGracefulTime());
-        //builder.setTravelTimestamp(request.getTravelTimestamp());
+        long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), request.getCollectionName());
         builder.setGuaranteeTimestamp(guaranteeTimestamp);
 
         // a new parameter from v2.2.9, if user didn't specify consistency level, set this parameter to true

+ 23 - 1
src/test/java/io/milvus/client/MilvusServiceClientTest.java

@@ -22,6 +22,7 @@ package io.milvus.client;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.common.utils.GTsDict;
 import io.milvus.exception.IllegalResponseException;
 import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
@@ -40,6 +41,7 @@ import io.milvus.param.partition.*;
 import io.milvus.response.*;
 import io.milvus.server.MockMilvusServer;
 import io.milvus.server.MockMilvusServerImpl;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.InvocationTargetException;
@@ -3116,4 +3118,24 @@ class MilvusServiceClientTest {
 
         assertFalse(wrapper.toString().isEmpty());
     }
-}
+
+    @Test
+    void testGTsDict() {
+        GTsDict dict = GTsDict.getInstance();
+        dict.updateCollectionTs("aaa", 0L);
+        dict.updateCollectionTs("bbb", 999L);
+        dict.updateCollectionTs("ccc", -10L);
+        Assertions.assertEquals(0L, dict.getCollectionTs("aaa"));
+        Assertions.assertEquals(999L, dict.getCollectionTs("bbb"));
+        Assertions.assertEquals(-10L, dict.getCollectionTs("ccc"));
+
+        dict.updateCollectionTs("aaa", 20L);
+        Assertions.assertEquals(20L, dict.getCollectionTs("aaa"));
+        dict.updateCollectionTs("bbb", 200L);
+        Assertions.assertEquals(999L, dict.getCollectionTs("bbb"));
+        dict.updateCollectionTs("ccc", -50L);
+        Assertions.assertEquals(-10L, dict.getCollectionTs("ccc"));
+        dict.updateCollectionTs("ccc", 50L);
+        Assertions.assertEquals(50L, dict.getCollectionTs("ccc"));
+    }
+}