瀏覽代碼

support searchIterator && queryIterator (#851)

Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
xushuang.hu 1 年之前
父節點
當前提交
8290be2057

+ 17 - 0
examples/main/java/io/milvus/CommonUtils.java

@@ -44,6 +44,14 @@ public class CommonUtils {
         return vector;
     }
 
+    public static List<Float> generateFloatVector(int dimension, Float initValue) {
+        List<Float> vector = new ArrayList<>();
+        for (int i = 0; i < dimension; ++i) {
+            vector.add(initValue);
+        }
+        return vector;
+    }
+
     public static List<List<Float>> generateFloatVectors(int dimension, int count) {
         List<List<Float>> vectors = new ArrayList<>();
         for (int n = 0; n < count; ++n) {
@@ -53,6 +61,15 @@ public class CommonUtils {
         return vectors;
     }
 
+    public static List<List<Float>> generateFixFloatVectors(int dimension, int count) {
+        List<List<Float>> vectors = new ArrayList<>();
+        for (int n = 0; n < count; ++n) {
+            List<Float> vector = generateFloatVector(dimension, (float)n);
+            vectors.add(vector);
+        }
+        return vectors;
+    }
+
     public static ByteBuffer generateBinaryVector(int dimension) {
         Random ran = new Random();
         int byteCount = dimension / 8;

+ 334 - 0
examples/main/java/io/milvus/IteratorExample.java

@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus;
+
+import com.google.common.collect.Lists;
+import io.milvus.client.MilvusClient;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.FlushResponse;
+import io.milvus.grpc.GetCollectionStatisticsResponse;
+import io.milvus.grpc.MutationResult;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RetryParam;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.collection.*;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.QueryIteratorParam;
+import io.milvus.param.dml.SearchIteratorParam;
+import io.milvus.param.index.CreateIndexParam;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.orm.iterator.SearchIterator;
+import io.milvus.response.GetCollStatResponseWrapper;
+import io.milvus.response.QueryResultsWrapper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class IteratorExample {
+    private static final MilvusClient milvusClient;
+
+    static {
+        ConnectParam connectParam = ConnectParam.newBuilder()
+                .withHost("localhost")
+                .withPort(19530)
+                .withSecure(true)
+                .withAuthorization("root","Milvus")
+                .build();
+        RetryParam retryParam = RetryParam.newBuilder()
+                .withMaxRetryTimes(3)
+                .build();
+        milvusClient = new MilvusServiceClient(connectParam).withRetry(retryParam);
+    }
+
+    private static final String COLLECTION_NAME = "test_iterator";
+    private static final String ID_FIELD = "userID";
+    private static final String VECTOR_FIELD = "userFace";
+    private static final Integer VECTOR_DIM = 8;
+    private static final String AGE_FIELD = "userAge";
+
+    private static final String INDEX_NAME = "userFaceIndex";
+    private static final IndexType INDEX_TYPE = IndexType.IVF_FLAT;
+    private static final String INDEX_PARAM = "{\"nlist\":128}";
+    private static final boolean CLEAR_EXIST = false;
+    private static final Integer NUM_ENTITIES = 1000;
+
+    private void createCollection(long timeoutMilliseconds) {
+        FieldType fieldType1 = FieldType.newBuilder()
+                .withName(ID_FIELD)
+                .withDataType(DataType.Int64)
+                .withPrimaryKey(true)
+                .withAutoID(false)
+                .build();
+
+        FieldType fieldType2 = FieldType.newBuilder()
+                .withName(VECTOR_FIELD)
+                .withDataType(DataType.FloatVector)
+                .withDimension(VECTOR_DIM)
+                .build();
+
+        FieldType fieldType3 = FieldType.newBuilder()
+                .withName(AGE_FIELD)
+                .withDataType(DataType.Int64)
+                .build();
+
+        CollectionSchemaParam collectionSchemaParam = CollectionSchemaParam.newBuilder()
+                .withEnableDynamicField(false)
+                .addFieldType(fieldType1)
+                .addFieldType(fieldType2)
+                .addFieldType(fieldType3)
+                .build();
+
+        CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder()
+                .withCollectionName(COLLECTION_NAME)
+                .withShardsNum(2)
+                .withSchema(collectionSchemaParam)
+                .withConsistencyLevel(ConsistencyLevelEnum.EVENTUALLY)
+                .build();
+        R<RpcStatus> response = milvusClient.withTimeout(timeoutMilliseconds, TimeUnit.MILLISECONDS)
+                .createCollection(createCollectionReq);
+        CommonUtils.handleResponseStatus(response);
+    }
+
+    private boolean hasCollection() {
+        R<Boolean> response = milvusClient.hasCollection(HasCollectionParam.newBuilder()
+                .withCollectionName(COLLECTION_NAME)
+                .build());
+        CommonUtils.handleResponseStatus(response);
+        return response.getData();
+    }
+
+    private void dropCollection() {
+        R<RpcStatus> response = milvusClient.dropCollection(DropCollectionParam.newBuilder()
+                .withCollectionName(COLLECTION_NAME)
+                .build());
+        CommonUtils.handleResponseStatus(response);
+    }
+
+    private void loadCollection() {
+        R<RpcStatus> response = milvusClient.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(COLLECTION_NAME)
+                .build());
+        CommonUtils.handleResponseStatus(response);
+        System.out.printf("Finish Loading Collection %s\n", COLLECTION_NAME);
+    }
+
+    private void createIndex() {
+        // create index for vector field
+        R<RpcStatus> response = milvusClient.createIndex(CreateIndexParam.newBuilder()
+                .withCollectionName(COLLECTION_NAME)
+                .withFieldName(VECTOR_FIELD)
+                .withIndexName(INDEX_NAME)
+                .withIndexType(INDEX_TYPE)
+                .withMetricType(MetricType.L2)
+                .withExtraParam(INDEX_PARAM)
+                .withSyncMode(Boolean.TRUE)
+                .build());
+        CommonUtils.handleResponseStatus(response);
+        System.out.printf("Finish Creating index %s\n", INDEX_TYPE);
+    }
+
+    private void insertColumns() {
+        int batchCount = 5;
+        for (int batch = 0; batch < batchCount; ++batch) {
+            List<List<Float>> vectors = CommonUtils.generateFixFloatVectors(VECTOR_DIM, NUM_ENTITIES);
+
+            List<Long> ages = new ArrayList<>();
+            List<Long> ids = new ArrayList<>();
+            for (long i = 0L; i < NUM_ENTITIES; ++i) {
+                ages.add((long) batch * NUM_ENTITIES + i);
+                ids.add((long) batch * NUM_ENTITIES + i);
+            }
+
+            List<InsertParam.Field> fields = new ArrayList<>();
+            fields.add(new InsertParam.Field(ID_FIELD, ids));
+            fields.add(new InsertParam.Field(AGE_FIELD, ages));
+            fields.add(new InsertParam.Field(VECTOR_FIELD, vectors));
+
+            InsertParam insertParam = InsertParam.newBuilder()
+                    .withCollectionName(COLLECTION_NAME)
+                    .withFields(fields)
+                    .build();
+            R<MutationResult> response = milvusClient.insert(insertParam);
+            CommonUtils.handleResponseStatus(response);
+
+            R<FlushResponse> flush = milvusClient.flush(FlushParam.newBuilder().addCollectionName(COLLECTION_NAME).build());
+            CommonUtils.handleResponseStatus(flush);
+
+            GetCollectionStatisticsParam collectionStatisticsParam = GetCollectionStatisticsParam.newBuilder().withCollectionName(COLLECTION_NAME).build();
+            R<GetCollectionStatisticsResponse> collectionStatistics = milvusClient.getCollectionStatistics(collectionStatisticsParam);
+            CommonUtils.handleResponseStatus(collectionStatistics);
+            GetCollStatResponseWrapper wrapper = new GetCollStatResponseWrapper(collectionStatistics.getData());
+
+            System.out.printf("Finish insert batch%s, number of entities in Milvus: %s\n", batch, wrapper.getRowCount());
+        }
+
+    }
+
+    private void reCreateCollection() {
+        if (hasCollection()) {
+            if (CLEAR_EXIST) {
+                dropCollection();
+                System.out.printf("Dropped existed collection %s%n", COLLECTION_NAME);
+            }
+        } else {
+            createCollection(2000);
+            System.out.printf("Create collection %s%n", COLLECTION_NAME);
+        }
+    }
+
+    private void prepareData() {
+        insertColumns();
+        createIndex();
+        loadCollection();
+    }
+
+    private void queryIterateCollectionNoOffset() {
+        String expr = String.format("10 <= %s <= 100", AGE_FIELD);
+
+        QueryIterator queryIterator = getQueryIterator(expr, 0L, 5L, null);
+        iterateQueryResult(queryIterator);
+    }
+
+    private void queryIterateCollectionWithOffset() {
+        String expr = String.format("10 <= %s <= 100", AGE_FIELD);
+        QueryIterator queryIterator = getQueryIterator(expr, 10L, 50L, null);
+        iterateQueryResult(queryIterator);
+    }
+
+    private void queryIterateCollectionWithLimit() {
+        String expr = String.format("10 <= %s <= 100", AGE_FIELD);
+        QueryIterator queryIterator = getQueryIterator(expr, null, 80L, 530L);
+        iterateQueryResult(queryIterator);
+    }
+
+    private void searchIteratorCollection() {
+        List<List<Float>> floatVector = CommonUtils.generateFixFloatVectors(VECTOR_DIM, 1);
+        String params = buildSearchParams();
+        SearchIterator searchIterator = getSearchIterator(floatVector, 500L, null, params);
+        iterateSearchResult(searchIterator);
+    }
+
+    private void searchIteratorCollectionWithLimit() {
+        List<List<Float>> floatVector = CommonUtils.generateFixFloatVectors(VECTOR_DIM, 1);
+        String params = buildSearchParams();
+        SearchIterator searchIterator = getSearchIterator(floatVector, 200L, 755, params);
+        iterateSearchResult(searchIterator);
+    }
+
+    private void iterateQueryResult(QueryIterator queryIterator) {
+        int pageIdx = 0;
+        while (true) {
+            List<QueryResultsWrapper.RowRecord> res = queryIterator.next();
+            if (res.isEmpty()) {
+                System.out.println("query iteration finished, close");
+                queryIterator.close();
+                break;
+            }
+
+            for (QueryResultsWrapper.RowRecord re : res) {
+                System.out.println(re);
+            }
+            pageIdx++;
+            System.out.printf("page%s-------------------------%n", pageIdx);
+        }
+    }
+
+    private void iterateSearchResult(SearchIterator searchIterator) {
+        int pageIdx = 0;
+        while (true) {
+            List<QueryResultsWrapper.RowRecord> res = searchIterator.next();
+            if (res.isEmpty()) {
+                System.out.println("search iteration finished, close");
+                searchIterator.close();
+                break;
+            }
+
+            for (QueryResultsWrapper.RowRecord re : res) {
+                System.out.println(re);
+            }
+            pageIdx++;
+            System.out.printf("page%s-------------------------%n", pageIdx);
+        }
+    }
+
+    private QueryIterator getQueryIterator(String expr, Long offset, Long batchSize, Long limit) {
+        QueryIteratorParam.Builder queryIteratorParamBuilder = QueryIteratorParam.newBuilder()
+                .withCollectionName(COLLECTION_NAME)
+                .withExpr(expr).withOutFields(Lists.newArrayList(ID_FIELD, AGE_FIELD))
+                .withBatchSize(batchSize).withConsistencyLevel(ConsistencyLevelEnum.EVENTUALLY);
+
+        if (offset != null) {
+            queryIteratorParamBuilder.withOffset(offset);
+        }
+        if (limit != null) {
+            queryIteratorParamBuilder.withLimit(limit);
+        }
+
+        R<QueryIterator> response = milvusClient.queryIterator(queryIteratorParamBuilder.build());
+        CommonUtils.handleResponseStatus(response);
+        return response.getData();
+    }
+
+    private SearchIterator getSearchIterator(List<List<Float>> vectors, Long batchSize, Integer topK, String params) {
+        SearchIteratorParam.Builder searchIteratorParamBuilder = SearchIteratorParam.newBuilder()
+                .withCollectionName(COLLECTION_NAME)
+                .withOutFields(Lists.newArrayList(ID_FIELD))
+                .withBatchSize(batchSize)
+                .withVectorFieldName(VECTOR_FIELD)
+                .withVectors(vectors)
+                .withParams(params)
+                .withMetricType(MetricType.L2);
+
+        if (topK != null) {
+            searchIteratorParamBuilder.withTopK(topK);
+        }
+
+        R<SearchIterator> response = milvusClient.searchIterator(searchIteratorParamBuilder.build());
+        CommonUtils.handleResponseStatus(response);
+        return response.getData();
+    }
+
+    private String buildSearchParams() {
+        return "{}";
+    }
+
+    public static void main(String[] args) {
+        boolean skipDataPeriod = false;
+
+        IteratorExample example = new IteratorExample();
+        example.reCreateCollection();
+        if (!skipDataPeriod) {
+            example.prepareData();
+        }
+
+        example.queryIterateCollectionNoOffset();
+        example.queryIterateCollectionWithOffset();
+        example.queryIterateCollectionWithLimit();
+
+        example.searchIteratorCollection();
+        example.searchIteratorCollectionWithLimit();
+    }
+}

+ 34 - 6
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -20,26 +20,24 @@
 package io.milvus.client;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.*;
 import io.grpc.StatusRuntimeException;
 import io.milvus.common.utils.JacksonUtils;
 import io.milvus.common.utils.VectorUtils;
 import io.milvus.exception.*;
 import io.milvus.grpc.*;
-import io.milvus.grpc.ObjectEntity;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.orm.iterator.SearchIterator;
 import io.milvus.param.*;
 import io.milvus.param.alias.*;
 import io.milvus.param.bulkinsert.*;
 import io.milvus.param.collection.*;
-import io.milvus.param.highlevel.collection.response.ListCollectionsResponse;
 import io.milvus.param.control.*;
 import io.milvus.param.credential.*;
 import io.milvus.param.dml.*;
 import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
 import io.milvus.param.highlevel.collection.ListCollectionsParam;
+import io.milvus.param.highlevel.collection.response.ListCollectionsResponse;
 import io.milvus.param.highlevel.dml.*;
 import io.milvus.param.highlevel.dml.response.*;
 import io.milvus.param.index.*;
@@ -3208,6 +3206,36 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
     }
 
+    @Override
+    public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
+        DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
+                .withDatabaseName(requestParam.getDatabaseName())
+                .withCollectionName(requestParam.getCollectionName());
+        R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
+        if (descResp.getStatus() != R.Status.Success.getCode()) {
+            logError("Failed to describe collection: {}", requestParam.getCollectionName());
+            return R.failed(descResp.getException());
+        }
+        DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
+        QueryIterator queryIterator = new QueryIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
+        return R.success(queryIterator);
+    }
+
+    @Override
+    public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
+        DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
+                .withDatabaseName(requestParam.getDatabaseName())
+                .withCollectionName(requestParam.getCollectionName());
+        R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
+        if (descResp.getStatus() != R.Status.Success.getCode()) {
+            logError("Failed to describe collection: {}", requestParam.getCollectionName());
+            return R.failed(descResp.getException());
+        }
+        DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
+        SearchIterator searchIterator = new SearchIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
+        return R.success(searchIterator);
+    }
+
     ///////////////////// Log Functions//////////////////////
     protected void logDebug(String msg, Object... params) {
         if (logLevel.ordinal() <= LogLevel.Debug.ordinal()) {

+ 20 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -37,6 +37,8 @@ import io.milvus.param.highlevel.collection.ListCollectionsParam;
 import io.milvus.param.highlevel.dml.*;
 import io.milvus.param.highlevel.dml.response.*;
 import io.milvus.param.index.*;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.orm.iterator.SearchIterator;
 import io.milvus.param.partition.*;
 import io.milvus.param.role.*;
 import io.milvus.param.resourcegroup.*;
@@ -828,4 +830,22 @@ public interface MilvusClient {
      * @return {status:result code, data: SearchResults{topK results}}
      */
     R<SearchResponse> search(SearchSimpleParam requestParam);
+
+
+    /**
+     * Get queryIterator based on scalar field(s) filtered by boolean expression.
+     * Note that the order of the returned entities cannot be guaranteed.
+     *
+     * @param requestParam {@link QueryIteratorParam}
+     * @return {status:result code,data: QueryIterator}
+     */
+    R<QueryIterator> queryIterator(QueryIteratorParam requestParam);
+
+    /**
+     * Get searchIterator based on a vector field. Use expression to do filtering before search.
+     *
+     * @param requestParam {@link SearchIteratorParam}
+     * @return {status:result code, data: SearchIterator}
+     */
+    R<SearchIterator> searchIterator(SearchIteratorParam requestParam);
 }

+ 12 - 0
src/main/java/io/milvus/client/MilvusMultiServiceClient.java

@@ -36,6 +36,8 @@ import io.milvus.param.highlevel.collection.ListCollectionsParam;
 import io.milvus.param.highlevel.dml.*;
 import io.milvus.param.highlevel.dml.response.*;
 import io.milvus.param.index.*;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.orm.iterator.SearchIterator;
 import io.milvus.param.partition.*;
 import io.milvus.param.resourcegroup.*;
 import io.milvus.param.role.*;
@@ -673,6 +675,16 @@ public class MilvusMultiServiceClient implements MilvusClient {
         return this.clusterFactory.getMaster().getClient().search(requestParam);
     }
 
+    @Override
+    public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().queryIterator(requestParam);
+    }
+
+    @Override
+    public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
+        return this.clusterFactory.getMaster().getClient().searchIterator(requestParam);
+    }
+
     private <T> R<T> handleResponse(List<R<T>> response) {
         if (CollectionUtils.isNotEmpty(response)) {
             R<T> rSuccess = null;

+ 14 - 7
src/main/java/io/milvus/client/MilvusServiceClient.java

@@ -19,8 +19,8 @@
 
 package io.milvus.client;
 
-import io.grpc.*;
 import io.grpc.Status;
+import io.grpc.*;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
@@ -28,8 +28,9 @@ import io.grpc.stub.MetadataUtils;
 import io.milvus.exception.MilvusException;
 import io.milvus.exception.ServerException;
 import io.milvus.grpc.*;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.orm.iterator.SearchIterator;
 import io.milvus.param.*;
-
 import io.milvus.param.alias.*;
 import io.milvus.param.bulkinsert.*;
 import io.milvus.param.collection.*;
@@ -50,12 +51,8 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.Callable;
-
-import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
-import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import java.util.concurrent.TimeUnit;
 
 public class MilvusServiceClient extends AbstractMilvusGrpcClient {
 
@@ -767,5 +764,15 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
     public R<SearchResponse> search(SearchSimpleParam requestParam) {
         return retry(()-> super.search(requestParam));
     }
+
+    @Override
+    public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
+        return retry(() -> super.queryIterator(requestParam));
+    }
+
+    @Override
+    public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
+        return retry(() -> super.searchIterator(requestParam));
+    }
 }
 

+ 31 - 0
src/main/java/io/milvus/orm/iterator/IteratorCache.java

@@ -0,0 +1,31 @@
+package io.milvus.orm.iterator;
+
+import io.milvus.response.QueryResultsWrapper;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.milvus.param.Constant.NO_CACHE_ID;
+
+public class IteratorCache {
+    private final AtomicInteger cacheId = new AtomicInteger(0);
+    private final Map<Integer, List<QueryResultsWrapper.RowRecord>> cacheMap = new ConcurrentHashMap<>();
+
+    public int cache(int cacheId, List<QueryResultsWrapper.RowRecord> result) {
+        if (cacheId == NO_CACHE_ID) {
+            cacheId = this.cacheId.incrementAndGet();
+        }
+        cacheMap.put(cacheId, result);
+        return cacheId;
+    }
+
+    public List<QueryResultsWrapper.RowRecord> fetchCache(int cacheId) {
+        return cacheMap.getOrDefault(cacheId, null);
+    }
+
+    public void releaseCache(int cacheId) {
+        cacheMap.remove(cacheId);
+    }
+}

+ 177 - 0
src/main/java/io/milvus/orm/iterator/QueryIterator.java

@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.orm.iterator;
+
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.MilvusServiceGrpc;
+import io.milvus.grpc.QueryRequest;
+import io.milvus.grpc.QueryResults;
+import io.milvus.param.ParamUtils;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.dml.QueryIteratorParam;
+import io.milvus.param.dml.QueryParam;
+import io.milvus.response.QueryResultsWrapper;
+import io.milvus.v2.utils.RpcUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+import static io.milvus.param.Constant.NO_CACHE_ID;
+import static io.milvus.param.Constant.UNLIMITED;
+
+public class QueryIterator {
+    private final IteratorCache iteratorCache;
+    private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
+    private final FieldType primaryField;
+
+    private final QueryIteratorParam queryIteratorParam;
+    private final int batchSize;
+    private final long limit;
+    private final String expr;
+    private long offset;
+    private Object nextId;
+    private int cacheIdInUse;
+    private long returnedCount;
+    private final RpcUtils rpcUtils;
+
+    public QueryIterator(QueryIteratorParam queryIteratorParam,
+                         MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                         FieldType primaryField) {
+        this.iteratorCache = new IteratorCache();
+        this.blockingStub = blockingStub;
+        this.primaryField = primaryField;
+        this.queryIteratorParam = queryIteratorParam;
+
+        this.batchSize = (int) queryIteratorParam.getBatchSize();
+        this.expr = queryIteratorParam.getExpr();
+        this.limit = queryIteratorParam.getLimit();
+        this.offset = queryIteratorParam.getOffset();
+        this.rpcUtils = new RpcUtils();
+
+        seek();
+    }
+
+    private void seek() {
+        this.cacheIdInUse = NO_CACHE_ID;
+        if (offset == 0) {
+            nextId = null;
+            return;
+        }
+
+        List<QueryResultsWrapper.RowRecord> res = getQueryResultsWrapper(expr, 0L, offset);
+        updateCursor(res.subList(0, (int) offset));
+        offset = 0;
+    }
+
+    public List<QueryResultsWrapper.RowRecord> next() {
+        List<QueryResultsWrapper.RowRecord> cachedRes = iteratorCache.fetchCache(cacheIdInUse);
+        List<QueryResultsWrapper.RowRecord> ret;
+        if (isResSufficient(cachedRes)) {
+            ret = cachedRes.subList(0, batchSize);
+            List<QueryResultsWrapper.RowRecord> retToCache = cachedRes.subList(batchSize, cachedRes.size());
+            iteratorCache.cache(cacheIdInUse, retToCache);
+        } else {
+            iteratorCache.releaseCache(cacheIdInUse);
+            String currentExpr = setupNextExpr();
+            List<QueryResultsWrapper.RowRecord> res = getQueryResultsWrapper(currentExpr, offset, batchSize);
+            maybeCache(res);
+            ret = res.subList(0, Math.min(batchSize, res.size()));
+        }
+        ret = checkReachedLimit(ret);
+        updateCursor(ret);
+        returnedCount += ret.size();
+        return ret;
+    }
+
+    public void close() {
+        iteratorCache.releaseCache(cacheIdInUse);
+    }
+
+    private void updateCursor(List<QueryResultsWrapper.RowRecord> res) {
+        if (res.isEmpty()) {
+            return;
+        }
+        nextId = res.get(res.size() - 1).get(primaryField.getName());
+    }
+
+    private List<QueryResultsWrapper.RowRecord> checkReachedLimit(List<QueryResultsWrapper.RowRecord> ret) {
+        if (limit == UNLIMITED) {
+            return ret;
+        }
+        long leftCount = limit - returnedCount;
+        if (leftCount >= ret.size()) {
+            return ret;
+        }
+
+        return ret.subList(0, (int) leftCount);
+    }
+
+    private void maybeCache(List<QueryResultsWrapper.RowRecord> ret) {
+        if (ret.size() < 2 * batchSize) {
+            return;
+        }
+        List<QueryResultsWrapper.RowRecord> cacheResult = ret.subList(batchSize, ret.size());
+        cacheIdInUse = iteratorCache.cache(NO_CACHE_ID, cacheResult);
+    }
+
+    private String setupNextExpr() {
+        String currentExpr = expr;
+        if (nextId == null) {
+            return currentExpr;
+        }
+        String filteredPKStr;
+        if (primaryField.getDataType() == DataType.VarChar) {
+            filteredPKStr = primaryField.getName() + " > " + "\\" + nextId + "\\";
+        } else {
+            filteredPKStr = primaryField.getName() + " > " + nextId;
+        }
+        if (StringUtils.isEmpty(currentExpr)) {
+            return filteredPKStr;
+        }
+        return currentExpr + " and " + filteredPKStr;
+    }
+
+    private boolean isResSufficient(List<QueryResultsWrapper.RowRecord> ret) {
+        return ret != null && ret.size() >= batchSize;
+    }
+
+    private List<QueryResultsWrapper.RowRecord> getQueryResultsWrapper(String expr, long offset, long limit) {
+        QueryParam queryParam = QueryParam.newBuilder()
+                .withDatabaseName(queryIteratorParam.getDatabaseName())
+                .withCollectionName(queryIteratorParam.getCollectionName())
+                .withConsistencyLevel(queryIteratorParam.getConsistencyLevel())
+                .withPartitionNames(queryIteratorParam.getPartitionNames())
+                .withOutFields(queryIteratorParam.getOutFields())
+                .withExpr(expr)
+                .withOffset(offset)
+                .withLimit(limit)
+                .withIgnoreGrowing(queryIteratorParam.isIgnoreGrowing())
+                .build();
+
+        QueryRequest queryRequest = ParamUtils.convertQueryParam(queryParam);
+        QueryResults response = blockingStub.query(queryRequest);
+
+        String title = String.format("QueryRequest collectionName:%s", queryIteratorParam.getCollectionName());
+        rpcUtils.handleResponse(title, response.getStatus());
+
+        QueryResultsWrapper queryWrapper = new QueryResultsWrapper(response);
+        return queryWrapper.getRowRecords();
+    }
+}

+ 431 - 0
src/main/java/io/milvus/orm/iterator/SearchIterator.java

@@ -0,0 +1,431 @@
+package io.milvus.orm.iterator;
+
+import com.amazonaws.util.CollectionUtils;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import io.milvus.common.utils.ExceptionUtils;
+import io.milvus.common.utils.JacksonUtils;
+import io.milvus.exception.ParamException;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.MilvusServiceGrpc;
+import io.milvus.grpc.SearchRequest;
+import io.milvus.grpc.SearchResults;
+import io.milvus.param.MetricType;
+import io.milvus.param.ParamUtils;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.dml.SearchIteratorParam;
+import io.milvus.param.dml.SearchParam;
+import io.milvus.response.QueryResultsWrapper;
+import io.milvus.response.SearchResultsWrapper;
+import io.milvus.v2.utils.RpcUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.DecimalFormat;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static io.milvus.param.Constant.DEFAULT_SEARCH_EXTENSION_RATE;
+import static io.milvus.param.Constant.EF;
+import static io.milvus.param.Constant.MAX_BATCH_SIZE;
+import static io.milvus.param.Constant.MAX_FILTERED_IDS_COUNT_ITERATION;
+import static io.milvus.param.Constant.MAX_TRY_TIME;
+import static io.milvus.param.Constant.NO_CACHE_ID;
+import static io.milvus.param.Constant.RADIUS;
+import static io.milvus.param.Constant.RANGE_FILTER;
+import static io.milvus.param.Constant.UNLIMITED;
+
+public class SearchIterator {
+    private static final Logger logger = LoggerFactory.getLogger(SearchIterator.class);
+    private final IteratorCache iteratorCache;
+    private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
+    private final FieldType primaryField;
+
+    private final SearchIteratorParam searchIteratorParam;
+    private final int batchSize;
+    private final int topK;
+    private final String expr;
+    private final String metricType;
+
+    private int cacheId;
+    private boolean initSuccess;
+    private int returnedCount;
+    private double width;
+    private float tailBand;
+
+    private List<Object> filteredIds;
+    private Float filteredDistance = null;
+    private Map<String, Object> params;
+    private final RpcUtils rpcUtils;
+
+    public SearchIterator(SearchIteratorParam searchIteratorParam,
+                          MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                          FieldType primaryField) {
+        this.iteratorCache = new IteratorCache();
+        this.searchIteratorParam = searchIteratorParam;
+        this.blockingStub = blockingStub;
+        this.primaryField = primaryField;
+        this.metricType = searchIteratorParam.getMetricType();
+
+        this.batchSize = (int) searchIteratorParam.getBatchSize();
+        this.expr = searchIteratorParam.getExpr();
+        this.topK = searchIteratorParam.getTopK();
+        this.rpcUtils = new RpcUtils();
+
+        initParams();
+        checkForSpecialIndexParam();
+        checkRmRangeSearchParameters();
+        initSearchIterator();
+    }
+
+    public List<QueryResultsWrapper.RowRecord> next() {
+        // 0. check reached limit
+        if (!initSuccess || checkReachedLimit()) {
+            return Lists.newArrayList();
+        }
+        int retLen = batchSize;
+        if (topK != UNLIMITED) {
+            int leftLen = topK - returnedCount;
+            retLen = Math.min(leftLen, retLen);
+        }
+
+        // 1. if cached page is sufficient, directly return
+        List<QueryResultsWrapper.RowRecord> retPage;
+        if (isCacheEnough(retLen)) {
+            retPage = extractPageFromCache(retLen);
+            returnedCount += retPage.size();
+            return retPage;
+        }
+
+        // 2. if cached page not enough, try to fill the result by probing with constant width
+        // until finish filling or exceeding max retry time: 20
+        List<QueryResultsWrapper.RowRecord> newPage = trySearchFill();
+        int cachedPageLen = pushNewPageToCache(newPage);
+        retLen = Math.min(cachedPageLen, retLen);
+        retPage = extractPageFromCache(retLen);
+        if (retPage.size() == batchSize) {
+            updateWidth(retPage);
+        }
+
+        // 3. if all result has return, clear the filteredIds
+        if (retPage.isEmpty()) {
+            filteredIds.clear();
+        }
+
+        returnedCount += retLen;
+        return retPage;
+    }
+
+    public void close() {
+        iteratorCache.releaseCache(cacheId);
+    }
+
+    private void initParams() {
+        if (null != searchIteratorParam.getParams() && !searchIteratorParam.getParams().isEmpty()) {
+            params = new HashMap<>();
+        }
+        params = JacksonUtils.fromJson(searchIteratorParam.getParams(), new TypeReference<Map<String, Object>>() {
+        });
+    }
+
+    private void checkForSpecialIndexParam() {
+        if (params.containsKey(EF) && (int) params.get(EF) < batchSize) {
+            ExceptionUtils.throwUnExpectedException("When using hnsw index, provided ef must be larger than or equal to batch size");
+        }
+    }
+
+    private void checkRmRangeSearchParameters() {
+        if (params.containsKey(RADIUS) && params.containsKey(RANGE_FILTER)) {
+            float radius = (float) params.get(RADIUS);
+            float rangeFilter = (float) params.get(RANGE_FILTER);
+            if (metricsPositiveRelated(metricType) && radius <= rangeFilter) {
+                String msg = String.format("for metrics:%s, radius must be larger than range_filter, please adjust your parameter", metricType);
+                ExceptionUtils.throwUnExpectedException(msg);
+            }
+            if (!metricsPositiveRelated(metricType) && radius >= rangeFilter) {
+                String msg = String.format("for metrics:%s, radius must be smalled than range_filter, please adjust your parameter", metricType);
+                ExceptionUtils.throwUnExpectedException(msg);
+            }
+        }
+    }
+
+    private void initSearchIterator() {
+        SearchResultsWrapper searchResultsWrapper = executeNextSearch(params, expr, false);
+        List<QueryResultsWrapper.RowRecord> result = searchResultsWrapper.getRowRecords(0);
+        if (CollectionUtils.isNullOrEmpty(result)) {
+            String msg = "Cannot init search iterator because init page contains no matched rows, " +
+                    "please check the radius and range_filter set up by searchParams";
+            logger.error(msg);
+
+            cacheId = NO_CACHE_ID;
+            initSuccess = false;
+            return;
+        }
+        cacheId = iteratorCache.cache(NO_CACHE_ID, result);
+
+        setUpRangeParameters(result);
+        updateFilteredIds(searchResultsWrapper);
+        initSuccess = true;
+    }
+
+    private void setUpRangeParameters(List<QueryResultsWrapper.RowRecord> page) {
+        updateWidth(page);
+        QueryResultsWrapper.RowRecord lastHit = page.get(page.size() - 1);
+        tailBand = getDistance(lastHit);
+        String msg = String.format("set up init parameter for searchIterator width:%s tail_band:%s", width, tailBand);
+        logger.debug(msg);
+        System.out.println(msg);
+    }
+
+    private void updateFilteredIds(SearchResultsWrapper searchResultsWrapper) {
+        List<SearchResultsWrapper.IDScore> idScores = searchResultsWrapper.getIDScore(0);
+        if (CollectionUtils.isNullOrEmpty(idScores)) {
+            return;
+        }
+
+        SearchResultsWrapper.IDScore lastHit = idScores.get(idScores.size() - 1);
+        if (lastHit == null) {
+            return;
+        }
+
+        if (filteredDistance == null || lastHit.getScore() != filteredDistance) {
+            // distance has changed, clear filter_ids array
+            filteredIds = Lists.newArrayList();
+            // renew the distance for filtering
+            filteredDistance = lastHit.getScore();
+        }
+
+        // update filter ids to avoid returning result repeatedly
+        for (SearchResultsWrapper.IDScore hit : idScores) {
+            if (hit.getScore() == lastHit.getScore()) {
+                if (primaryField.getDataType() == DataType.VarChar) {
+                    filteredIds.add(hit.getStrID());
+                } else {
+                    filteredIds.add(hit.getLongID());
+                }
+            }
+        }
+
+        if (filteredIds.size() > MAX_FILTERED_IDS_COUNT_ITERATION) {
+            String msg = String.format("filtered ids length has accumulated to more than %s, " +
+                    "there is a danger of overly memory consumption", MAX_FILTERED_IDS_COUNT_ITERATION);
+            ExceptionUtils.throwUnExpectedException(msg);
+        }
+    }
+
+    private SearchResultsWrapper executeNextSearch(Map<String, Object> params, String nextExpr, boolean toExtendBatch) {
+        SearchParam searchParam = SearchParam.newBuilder()
+                .withDatabaseName(searchIteratorParam.getDatabaseName())
+                .withCollectionName(searchIteratorParam.getCollectionName())
+                .withPartitionNames(searchIteratorParam.getPartitionNames())
+                .withConsistencyLevel(searchIteratorParam.getConsistencyLevel())
+                .withVectorFieldName(searchIteratorParam.getVectorFieldName())
+                .withTopK(extendBatchSize(batchSize, toExtendBatch, params))
+                .withExpr(nextExpr)
+                .withOutFields(searchIteratorParam.getOutFields())
+                .withVectors(searchIteratorParam.getVectors())
+                .withRoundDecimal(searchIteratorParam.getRoundDecimal())
+                .withParams(JacksonUtils.toJsonString(params))
+                .withIgnoreGrowing(searchIteratorParam.isIgnoreGrowing())
+                .build();
+
+        SearchRequest searchRequest = ParamUtils.convertSearchParam(searchParam);
+        SearchResults response = blockingStub.search(searchRequest);
+
+        String title = String.format("SearchRequest collectionName:%s", searchIteratorParam.getCollectionName());
+        rpcUtils.handleResponse(title, response.getStatus());
+
+        return new SearchResultsWrapper(response.getResults());
+    }
+
+    private int extendBatchSize(int batchSize, boolean toExtendBatchSize, Map<String, Object> nextParams) {
+        int extendRate = 1;
+
+        if (toExtendBatchSize) {
+            extendRate = DEFAULT_SEARCH_EXTENSION_RATE;
+        }
+
+        if (nextParams.containsKey(EF)) {
+            int ef = (int) nextParams.get(EF);
+            int realBatch = Math.min(MAX_BATCH_SIZE, Math.min(batchSize * extendRate, ef));
+            if (ef > realBatch) {
+                nextParams.put(EF, realBatch);
+            }
+            return realBatch;
+        }
+        return Math.min(MAX_BATCH_SIZE, batchSize * extendRate);
+    }
+
+    private void updateWidth(List<QueryResultsWrapper.RowRecord> page) {
+        QueryResultsWrapper.RowRecord firstHit = page.get(0);
+        QueryResultsWrapper.RowRecord lastHit = page.get(page.size() - 1);
+
+        if (metricsPositiveRelated(metricType)) {
+            width = getDistance(lastHit) - getDistance(firstHit);
+        } else {
+            width = getDistance(firstHit) - getDistance(lastHit);
+        }
+
+        if (width == 0.0) {
+            // enable a minimum value for width to avoid radius and range_filter equal error
+            width = 0.05;
+        }
+    }
+
+    private boolean metricsPositiveRelated(String metricType) {
+        if (Lists.newArrayList(MetricType.L2.name(), MetricType.JACCARD.name(), MetricType.HAMMING.name()).contains(metricType)) {
+            return true;
+        }
+        if (Lists.newArrayList(MetricType.IP.name(), MetricType.COSINE.name()).contains(metricType)) {
+            return false;
+        }
+        String msg = String.format("unsupported metrics type for search iteration: %s", metricType);
+        ExceptionUtils.throwUnExpectedException(msg);
+        return false;
+    }
+
+    private boolean checkReachedLimit() {
+        if (topK == UNLIMITED || returnedCount < topK) {
+            return false;
+        }
+
+        String msg = String.format("reached search limit:%s, returned_count:%s, directly return", topK, returnedCount);
+        logger.debug(msg);
+
+        return true;
+    }
+
+    private boolean isCacheEnough(int count) {
+        List<QueryResultsWrapper.RowRecord> cachedPage = iteratorCache.fetchCache(cacheId);
+        return cachedPage != null && cachedPage.size() >= count;
+    }
+
+    private List<QueryResultsWrapper.RowRecord> extractPageFromCache(int count) {
+        List<QueryResultsWrapper.RowRecord> cachedPage = iteratorCache.fetchCache(cacheId);
+        if (cachedPage == null || cachedPage.size() < count) {
+            String msg = String.format("Wrong, try to extract %s result from cache, more than %s there must be sth wrong with code",
+                    count, cachedPage == null ? 0 : cachedPage.size());
+            throw new ParamException(msg);
+        }
+
+        List<QueryResultsWrapper.RowRecord> retPageRes = cachedPage.subList(0, count);
+        List<QueryResultsWrapper.RowRecord> leftCachePage = cachedPage.subList(count, cachedPage.size());
+
+        iteratorCache.cache(cacheId, leftCachePage);
+        return retPageRes;
+    }
+
+    private List<QueryResultsWrapper.RowRecord> trySearchFill() {
+        List<QueryResultsWrapper.RowRecord> finalPage = Lists.newArrayList();
+        int tryTime = 0;
+        int coefficient = 1;
+
+        while (true) {
+            Map<String, Object> nextParams = nextParams(coefficient);
+            String nextExpr = filteredDuplicatedResultExpr(expr);
+            SearchResultsWrapper searchResultsWrapper = executeNextSearch(nextParams, nextExpr, true);
+
+            updateFilteredIds(searchResultsWrapper);
+            List<QueryResultsWrapper.RowRecord> newPage = searchResultsWrapper.getRowRecords(0);
+
+            tryTime++;
+            if (!newPage.isEmpty()) {
+                finalPage.addAll(newPage);
+                tailBand = getDistance(newPage.get(newPage.size() - 1));
+            }
+
+            if (finalPage.size() >= batchSize) {
+                break;
+            }
+
+            if (tryTime > MAX_TRY_TIME) {
+                String msg = String.format("Search exceed max try times:%s directly break", MAX_TRY_TIME);
+                logger.warn(msg);
+                break;
+            }
+            // if there's a ring containing no vectors matched, then we need to extend
+            // the ring continually to avoid empty ring problem
+            coefficient++;
+        }
+        return finalPage;
+    }
+
+    private Map<String, Object> nextParams(int coefficient) {
+        coefficient = Math.max(1, coefficient);
+        Map<String, Object> nextParams = JacksonUtils.fromJson(JacksonUtils.toJsonString(params), new TypeReference<Map<String, Object>>() {
+        });
+
+        if (metricsPositiveRelated(metricType)) {
+            double nextRadius = tailBand + width * coefficient;
+            if (params.containsKey(RADIUS) && nextRadius > (double) params.get(RADIUS)) {
+                nextParams.put(RADIUS, params.get(RADIUS));
+            } else {
+                nextParams.put(RADIUS, nextRadius);
+            }
+        } else {
+            double nextRadius = tailBand - width * coefficient;
+            if (params.containsKey(RADIUS) && nextRadius < (double) params.get(RADIUS)) {
+                nextParams.put(RADIUS, params.get(RADIUS));
+            } else {
+                nextParams.put(RADIUS, nextRadius);
+            }
+        }
+        nextParams.put(RANGE_FILTER, tailBand);
+
+        String msg = String.format("next round search iteration radius:%s,range_filter:%s,coefficient:%s",
+                convertToStr(nextParams.get(RADIUS)), convertToStr(nextParams.get(RANGE_FILTER)), coefficient);
+        logger.debug(msg);
+        return nextParams;
+    }
+
+    private String filteredDuplicatedResultExpr(String expr) {
+        if (CollectionUtils.isNullOrEmpty(filteredIds)) {
+            return expr;
+        }
+
+        StringBuilder filteredIdsStr = new StringBuilder();
+        for (Object filteredId : filteredIds) {
+            if (primaryField.getDataType() == DataType.VarChar) {
+                filteredIdsStr.append("\"").append(filteredId.toString()).append("\",");
+            } else {
+                filteredIdsStr.append((long) filteredId).append(",");
+            }
+        }
+        filteredIdsStr = new StringBuilder(filteredIdsStr.substring(0, filteredIdsStr.length() - 1));
+
+        if (filteredIdsStr.length() > 0) {
+            if (expr != null && !expr.isEmpty()) {
+                String filterExpr = String.format(" and %s not in [%s]", primaryField.getName(), filteredIdsStr);
+                return expr + filterExpr;
+            }
+            return String.format("%s not in [%s]", primaryField.getName(), filteredIdsStr);
+        }
+        return expr;
+    }
+
+    private int pushNewPageToCache(List<QueryResultsWrapper.RowRecord> page) {
+        if (page == null) {
+            throw new ParamException("Cannot push None page into cache");
+        }
+
+        List<QueryResultsWrapper.RowRecord> cachedPage = iteratorCache.fetchCache(cacheId);
+        if (cachedPage == null) {
+            iteratorCache.cache(cacheId, page);
+            cachedPage = page;
+        } else {
+            cachedPage.addAll(page);
+        }
+        return cachedPage.size();
+    }
+
+    private float getDistance(QueryResultsWrapper.RowRecord record) {
+        return (float) record.get("distance");
+    }
+
+    // Avoiding precision loss when converting to exponent notation.
+    private String convertToStr(Object value) {
+        DecimalFormat df = new DecimalFormat("0.0");
+        return df.format(value);
+    }
+}

+ 11 - 1
src/main/java/io/milvus/param/Constant.java

@@ -47,7 +47,6 @@ public class Constant {
     public final static String OFFSET = "offset";
     public final static String LIMIT = "limit";
     public final static String DYNAMIC_FIELD_NAME = "$meta";
-
     // constant values for general
     public static final String TTL_SECONDS = "collection.ttl.seconds";
     public static final String MMAP_ENABLED = "mmap.enabled";
@@ -84,4 +83,15 @@ public class Constant {
     public static final Long OFFSET_DEFAULT = 0L;
     public static final String ALL_OUTPUT_FIELDS = "*";
 
+    public static final int MAX_BATCH_SIZE = 16384;
+    public static final int NO_CACHE_ID = -1;
+    public static final int UNLIMITED = -1;
+    public static final int DEFAULT_SEARCH_EXTENSION_RATE = 10;
+    public static final int MAX_FILTERED_IDS_COUNT_ITERATION = 100000;
+    public static final int MAX_TRY_TIME = 20;
+
+    public static final String RADIUS = "radius";
+    public static final String EF = "ef";
+    public static final String RANGE_FILTER = "range_filter";
+
 }

+ 278 - 0
src/main/java/io/milvus/param/dml/QueryIteratorParam.java

@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.param.dml;
+
+import com.google.common.collect.Lists;
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.exception.ParamException;
+import io.milvus.param.Constant;
+import io.milvus.param.ParamUtils;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static io.milvus.param.Constant.MAX_BATCH_SIZE;
+import static io.milvus.param.Constant.UNLIMITED;
+
+/**
+ * Parameters for <code>queryIterator</code> interface.
+ */
+@Getter
+@ToString
+public class QueryIteratorParam {
+    private final String databaseName;
+    private final String collectionName;
+    private final List<String> partitionNames;
+    private final List<String> outFields;
+    private final String expr;
+    private final long travelTimestamp;
+    private final long guaranteeTimestamp;
+    private final long gracefulTime;
+    private final ConsistencyLevelEnum consistencyLevel;
+    private final long offset;
+    private final long limit;
+    private final boolean ignoreGrowing;
+
+    private final long batchSize;
+
+    private QueryIteratorParam(@NonNull Builder builder) {
+        this.databaseName = builder.databaseName;
+        this.collectionName = builder.collectionName;
+        this.partitionNames = builder.partitionNames;
+        this.outFields = builder.outFields;
+        this.expr = builder.expr;
+        this.travelTimestamp = builder.travelTimestamp;
+        this.guaranteeTimestamp = builder.guaranteeTimestamp;
+        this.consistencyLevel = builder.consistencyLevel;
+        this.gracefulTime = builder.gracefulTime;
+        this.offset = builder.offset;
+        this.limit = builder.limit;
+        this.ignoreGrowing = builder.ignoreGrowing;
+
+        this.batchSize = builder.batchSize;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link QueryIteratorParam} class.
+     */
+    public static class Builder {
+        private String databaseName;
+        private String collectionName;
+        private final List<String> partitionNames = Lists.newArrayList();
+        private final List<String> outFields = new ArrayList<>();
+        private String expr = "";
+        private Long travelTimestamp = 0L;
+        private Long gracefulTime = 5000L;
+        private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS;
+        private ConsistencyLevelEnum consistencyLevel = null;
+        private Long offset = 0L;
+        private Long limit = (long) UNLIMITED;
+        private Boolean ignoreGrowing = Boolean.FALSE;
+        private Long batchSize = 1000L;
+
+        private Builder() {
+        }
+
+        /**
+         * Sets the database name. database name can be nil.
+         *
+         * @param databaseName database name
+         * @return <code>Builder</code>
+         */
+        public Builder withDatabaseName(String databaseName) {
+            this.databaseName = databaseName;
+            return this;
+        }
+
+        /**
+         * Sets the collection name. Collection name cannot be empty or null.
+         *
+         * @param collectionName collection name
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionName(@NonNull String collectionName) {
+            this.collectionName = collectionName;
+            return this;
+        }
+
+        /**
+         * ConsistencyLevel of consistency level.
+         *
+         * @param consistencyLevel consistency level
+         * @return <code>Builder</code>
+         */
+        public Builder withConsistencyLevel(ConsistencyLevelEnum consistencyLevel) {
+            this.consistencyLevel = consistencyLevel;
+            return this;
+        }
+
+        /**
+         * Sets partition names list to specify query scope (Optional).
+         *
+         * @param partitionNames partition names list
+         * @return <code>Builder</code>
+         */
+        public Builder withPartitionNames(@NonNull List<String> partitionNames) {
+            partitionNames.forEach(this::addPartitionName);
+            return this;
+        }
+
+        /**
+         * Adds a partition to specify query scope (Optional).
+         *
+         * @param partitionName partition name
+         * @return <code>Builder</code>
+         */
+        public Builder addPartitionName(@NonNull String partitionName) {
+            if (!this.partitionNames.contains(partitionName)) {
+                this.partitionNames.add(partitionName);
+            }
+            return this;
+        }
+
+        /**
+         * Specifies output fields (Optional).
+         *
+         * @param outFields output fields
+         * @return <code>Builder</code>
+         */
+        public Builder withOutFields(@NonNull List<String> outFields) {
+            outFields.forEach(this::addOutField);
+            return this;
+        }
+
+        /**
+         * Specifies an output field (Optional).
+         *
+         * @param fieldName field name
+         * @return <code>Builder</code>
+         */
+        public Builder addOutField(@NonNull String fieldName) {
+            if (!this.outFields.contains(fieldName)) {
+                this.outFields.add(fieldName);
+            }
+            return this;
+        }
+
+        /**
+         * Sets the expression to query entities.
+         * @see <a href="https://milvus.io/docs/v2.0.0/boolean.md">Boolean Expression Rules</a>
+         *
+         * @param expr filtering expression
+         * @return <code>Builder</code>
+         */
+        public Builder withExpr(@NonNull String expr) {
+            this.expr = expr;
+            return this;
+        }
+
+        /**
+         * Specify a position to return results. Only take effect when the 'limit' value is specified.
+         * Default value is 0, start from begin.
+         *
+         * @param offset a value to define the position
+         * @return <code>Builder</code>
+         */
+        public Builder withOffset(@NonNull Long offset) {
+            this.offset = offset;
+            return this;
+        }
+
+        /**
+         * Specify a value to control the returned number of entities. Must be a positive value.
+         * Default value is -1, will return without limit.
+         *
+         * @param limit a value to define the limit of returned entities
+         * @return <code>Builder</code>
+         */
+        public Builder withLimit(@NonNull Long limit) {
+            this.limit = limit;
+            return this;
+        }
+
+        /**
+         * Specify a value to control the number of entities returned per batch. Must be a positive value.
+         * Default value is 1000, will return without batchSize.
+         *
+         * @param batchSize a value to define the number of entities returned per batch
+         * @return <code>Builder</code>
+         */
+        public Builder withBatchSize(@NotNull Long batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        /**
+         * Ignore the growing segments to get best query performance. Default is False.
+         * For the user case that don't require data visibility.
+         *
+         * @param ignoreGrowing <code>Boolean.TRUE</code> ignore, Boolean.FALSE is not
+         * @return <code>Builder</code>
+         */
+        public Builder withIgnoreGrowing(@NonNull Boolean ignoreGrowing) {
+            this.ignoreGrowing = ignoreGrowing;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link QueryIteratorParam} instance.
+         *
+         * @return {@link QueryIteratorParam}
+         */
+        public QueryIteratorParam build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(collectionName, "Collection name");
+            ParamUtils.CheckNullString(expr, "Expression");
+
+            if (travelTimestamp < 0) {
+                throw new ParamException("The travel timestamp must be greater than 0");
+            }
+
+            if (guaranteeTimestamp < 0) {
+                throw new ParamException("The guarantee timestamp must be greater than 0");
+            }
+
+            if (offset < 0) {
+                throw new ParamException("The offset value cannot be less than 0");
+            }
+
+            if (limit != UNLIMITED && limit < 0) {
+                throw new ParamException("The limit value cannot be less than 0");
+            }
+
+            if (batchSize < 0) {
+                throw new ParamException("batch size cannot be less than zero");
+            }
+
+            if (batchSize > MAX_BATCH_SIZE) {
+                throw new ParamException(String.format("batch size cannot be larger than %s", MAX_BATCH_SIZE));
+            }
+            return new QueryIteratorParam(this);
+        }
+    }
+
+}

+ 374 - 0
src/main/java/io/milvus/param/dml/SearchIteratorParam.java

@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.param.dml;
+
+import com.google.common.collect.Lists;
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.exception.ParamException;
+import io.milvus.param.Constant;
+import io.milvus.param.MetricType;
+import io.milvus.param.ParamUtils;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import org.jetbrains.annotations.NotNull;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static io.milvus.param.Constant.UNLIMITED;
+
+/**
+ * Parameters for <code>searchIterator</code> interface.
+ */
+@Getter
+@ToString
+public class SearchIteratorParam {
+    private final String databaseName;
+    private final String collectionName;
+    private final List<String> partitionNames;
+    private final String metricType;
+    private final String vectorFieldName;
+    private final int topK;
+    private final String expr;
+    private final List<String> outFields;
+    private final List<?> vectors;
+    private final Long NQ;
+    private final int roundDecimal;
+    private final String params;
+    private final long travelTimestamp;
+    private final long guaranteeTimestamp;
+    private final Long gracefulTime;
+    private final ConsistencyLevelEnum consistencyLevel;
+    private final boolean ignoreGrowing;
+
+    private final long batchSize;
+
+    private SearchIteratorParam(@NonNull Builder builder) {
+        this.databaseName = builder.databaseName;
+        this.collectionName = builder.collectionName;
+        this.partitionNames = builder.partitionNames;
+        this.metricType = builder.metricType.name();
+        this.vectorFieldName = builder.vectorFieldName;
+        this.topK = builder.topK;
+        this.expr = builder.expr;
+        this.outFields = builder.outFields;
+        this.vectors = builder.vectors;
+        this.NQ = builder.NQ;
+        this.roundDecimal = builder.roundDecimal;
+        this.params = builder.params;
+        this.travelTimestamp = builder.travelTimestamp;
+        this.guaranteeTimestamp = builder.guaranteeTimestamp;
+        this.gracefulTime = builder.gracefulTime;
+        this.consistencyLevel = builder.consistencyLevel;
+        this.ignoreGrowing = builder.ignoreGrowing;
+        this.batchSize = builder.batchSize;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link SearchIteratorParam} class.
+     */
+    public static class Builder {
+        private String databaseName;
+        private String collectionName;
+        private final List<String> partitionNames = Lists.newArrayList();
+        private MetricType metricType = MetricType.None;
+        private String vectorFieldName;
+        private Integer topK = UNLIMITED;
+        private String expr = "";
+        private final List<String> outFields = Lists.newArrayList();
+        private List<?> vectors;
+        private Long NQ;
+        private Integer roundDecimal = -1;
+        private String params = "{}";
+        private Long travelTimestamp = 0L;
+        private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS;
+        private Long gracefulTime = 5000L;
+        private ConsistencyLevelEnum consistencyLevel = null;
+        private Boolean ignoreGrowing = Boolean.FALSE;
+
+        private Long batchSize = 1000L;
+
+        Builder() {
+        }
+
+        /**
+         * Sets the database name. database name can be nil.
+         *
+         * @param databaseName database name
+         * @return <code>Builder</code>
+         */
+        public Builder withDatabaseName(String databaseName) {
+            this.databaseName = databaseName;
+            return this;
+        }
+
+        /**
+         * Sets the collection name. Collection name cannot be empty or null.
+         *
+         * @param collectionName collection name
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionName(@NonNull String collectionName) {
+            this.collectionName = collectionName;
+            return this;
+        }
+
+        /**
+         * Sets partition names list to specify search scope (Optional).
+         *
+         * @param partitionNames partition names list
+         * @return <code>Builder</code>
+         */
+        public Builder withPartitionNames(@NonNull List<String> partitionNames) {
+            partitionNames.forEach(this::addPartitionName);
+            return this;
+        }
+
+        /**
+         * ConsistencyLevel of consistency level.
+         *
+         * @param consistencyLevel consistency level
+         * @return <code>Builder</code>
+         */
+        public Builder withConsistencyLevel(ConsistencyLevelEnum consistencyLevel) {
+            this.consistencyLevel = consistencyLevel;
+            return this;
+        }
+
+        /**
+         * Adds a partition to specify search scope (Optional).
+         *
+         * @param partitionName partition name
+         * @return <code>Builder</code>
+         */
+        public Builder addPartitionName(@NonNull String partitionName) {
+            if (!this.partitionNames.contains(partitionName)) {
+                this.partitionNames.add(partitionName);
+            }
+            return this;
+        }
+
+        /**
+         * Sets metric type of ANN searching.
+         *
+         * @param metricType metric type
+         * @return <code>Builder</code>
+         */
+        public Builder withMetricType(@NonNull MetricType metricType) {
+            this.metricType = metricType;
+            return this;
+        }
+
+        /**
+         * Sets target vector field by name. Field name cannot be empty or null.
+         *
+         * @param vectorFieldName vector field name
+         * @return <code>Builder</code>
+         */
+        public Builder withVectorFieldName(@NonNull String vectorFieldName) {
+            this.vectorFieldName = vectorFieldName;
+            return this;
+        }
+
+        /**
+         * Sets topK value of ANN search.
+         *
+         * @param topK topK value
+         * @return <code>Builder</code>
+         */
+        public Builder withTopK(@NonNull Integer topK) {
+            this.topK = topK;
+            return this;
+        }
+
+        /**
+         * Sets expression to filter out entities before searching (Optional).
+         * @see <a href="https://milvus.io/docs/v2.0.0/boolean.md">Boolean Expression Rules</a>
+         *
+         * @param expr filtering expression
+         * @return <code>Builder</code>
+         */
+        public Builder withExpr(@NonNull String expr) {
+            this.expr = expr;
+            return this;
+        }
+
+        /**
+         * Specifies output fields (Optional).
+         *
+         * @param outFields output fields
+         * @return <code>Builder</code>
+         */
+        public Builder withOutFields(@NonNull List<String> outFields) {
+            outFields.forEach(this::addOutField);
+            return this;
+        }
+
+        /**
+         * Specifies an output field (Optional).
+         *
+         * @param fieldName filed name
+         * @return <code>Builder</code>
+         */
+        public Builder addOutField(@NonNull String fieldName) {
+            if (!this.outFields.contains(fieldName)) {
+                this.outFields.add(fieldName);
+            }
+            return this;
+        }
+
+        /**
+         * Sets the target vectors.
+         *
+         * @param vectors list of target vectors:
+         *                if vector type is FloatVector, vectors is List of List Float;
+         *                if vector type is BinaryVector, vectors is List of ByteBuffer;
+         * @return <code>Builder</code>
+         */
+        public Builder withVectors(@NonNull List<?> vectors) {
+            this.vectors = vectors;
+            this.NQ = (long) vectors.size();
+            return this;
+        }
+
+        /**
+         * Specifies the decimal place of the returned results.
+         *
+         * @param decimal how many digits after the decimal point
+         * @return <code>Builder</code>
+         */
+        public Builder withRoundDecimal(@NonNull Integer decimal) {
+            this.roundDecimal = decimal;
+            return this;
+        }
+
+        /**
+         * Sets the search parameters specific to the index type.
+         *
+         * For example: IVF index, the search parameters can be "{\"nprobe\":10}"
+         * For more information: @see <a href="https://milvus.io/docs/v2.0.0/index_selection.md">Index Selection</a>
+         *
+         * @param params extra parameters in json format
+         * @return <code>Builder</code>
+         */
+        public Builder withParams(@NonNull String params) {
+            this.params = params;
+            return this;
+        }
+
+        /**
+         * Ignore the growing segments to get best search performance. Default is False.
+         * For the user case that don't require data visibility.
+         *
+         * @param ignoreGrowing <code>Boolean.TRUE</code> ignore, Boolean.FALSE is not
+         * @return <code>Builder</code>
+         */
+        public Builder withIgnoreGrowing(@NonNull Boolean ignoreGrowing) {
+            this.ignoreGrowing = ignoreGrowing;
+            return this;
+        }
+
+        /**
+         * Specify a value to control the number of entities returned per batch. Must be a positive value.
+         * Default value is 1000, will return without batchSize.
+         *
+         * @param batchSize a value to define the number of entities returned per batch
+         * @return <code>Builder</code>
+         */
+        public Builder withBatchSize(@NotNull Long batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link SearchIteratorParam} instance.
+         *
+         * @return {@link SearchIteratorParam}
+         */
+        public SearchIteratorParam build() throws ParamException {
+            ParamUtils.CheckNullEmptyString(collectionName, "Collection name");
+            ParamUtils.CheckNullEmptyString(vectorFieldName, "Target field name");
+
+            if (topK != UNLIMITED && topK <= 0) {
+                throw new ParamException("TopK value is illegal");
+            }
+
+            if (travelTimestamp < 0) {
+                throw new ParamException("The travel timestamp must be greater than 0");
+            }
+
+            if (guaranteeTimestamp < 0) {
+                throw new ParamException("The guarantee timestamp must be greater than 0");
+            }
+
+            if (vectors == null || vectors.isEmpty()) {
+                throw new ParamException("Target vectors can not be empty");
+            }
+
+            if (metricType == MetricType.None) {
+                throw new ParamException("must specify metricType for search iterator");
+            }
+
+            if (vectors.get(0) instanceof List) {
+                if (vectors.size() > 1) {
+                    throw new ParamException("Not support search iteration over multiple vectors at present");
+                }
+
+                // float vectors
+                List<?> first = (List<?>) vectors.get(0);
+                if (!(first.get(0) instanceof Float)) {
+                    throw new ParamException("Float vector field's value must be Lst<Float>");
+                }
+
+                int dim = first.size();
+                for (int i = 1; i < vectors.size(); ++i) {
+                    List<?> temp = (List<?>) vectors.get(i);
+                    if (dim != temp.size()) {
+                        throw new ParamException("Target vector dimension must be equal");
+                    }
+                }
+            } else if (vectors.get(0) instanceof ByteBuffer) {
+                // binary vectors
+                if (vectors.size() > 1) {
+                    throw new ParamException("Not support search iteration over multiple vectors at present");
+                }
+
+                ByteBuffer first = (ByteBuffer) vectors.get(0);
+                int dim = first.position();
+                for (int i = 1; i < vectors.size(); ++i) {
+                    ByteBuffer temp = (ByteBuffer) vectors.get(i);
+                    if (dim != temp.position()) {
+                        throw new ParamException("Target vector dimension must be equal");
+                    }
+                }
+            } else {
+                throw new ParamException("Target vector type must be List<Float> or ByteBuffer");
+            }
+
+            return new SearchIteratorParam(this);
+        }
+    }
+
+}