IteratorExample.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package io.milvus.v1;
  20. import com.google.common.collect.Lists;
  21. import io.milvus.client.MilvusClient;
  22. import io.milvus.client.MilvusServiceClient;
  23. import io.milvus.common.clientenum.ConsistencyLevelEnum;
  24. import io.milvus.grpc.DataType;
  25. import io.milvus.grpc.FlushResponse;
  26. import io.milvus.grpc.GetCollectionStatisticsResponse;
  27. import io.milvus.grpc.MutationResult;
  28. import io.milvus.param.ConnectParam;
  29. import io.milvus.param.IndexType;
  30. import io.milvus.param.MetricType;
  31. import io.milvus.param.R;
  32. import io.milvus.param.RetryParam;
  33. import io.milvus.param.RpcStatus;
  34. import io.milvus.param.collection.*;
  35. import io.milvus.param.dml.InsertParam;
  36. import io.milvus.param.dml.QueryIteratorParam;
  37. import io.milvus.param.dml.SearchIteratorParam;
  38. import io.milvus.param.index.CreateIndexParam;
  39. import io.milvus.orm.iterator.QueryIterator;
  40. import io.milvus.orm.iterator.SearchIterator;
  41. import io.milvus.response.GetCollStatResponseWrapper;
  42. import io.milvus.response.QueryResultsWrapper;
  43. import java.util.ArrayList;
  44. import java.util.List;
  45. import java.util.concurrent.TimeUnit;
  46. public class IteratorExample {
  47. private static final MilvusClient milvusClient;
  48. static {
  49. ConnectParam connectParam = ConnectParam.newBuilder()
  50. .withHost("localhost")
  51. .withPort(19530)
  52. .build();
  53. RetryParam retryParam = RetryParam.newBuilder()
  54. .withMaxRetryTimes(3)
  55. .build();
  56. milvusClient = new MilvusServiceClient(connectParam).withRetry(retryParam);
  57. }
  58. private static final String COLLECTION_NAME = "java_sdk_example_iterator_v1";
  59. private static final String ID_FIELD = "userID";
  60. private static final String VECTOR_FIELD = "userFace";
  61. private static final Integer VECTOR_DIM = 8;
  62. private static final String AGE_FIELD = "userAge";
  63. private static final String INDEX_NAME = "userFaceIndex";
  64. private static final IndexType INDEX_TYPE = IndexType.IVF_FLAT;
  65. private static final String INDEX_PARAM = "{\"nlist\":128}";
  66. private static final boolean CLEAR_EXIST = false;
  67. private static final Integer NUM_ENTITIES = 1000;
  68. private void createCollection(long timeoutMilliseconds) {
  69. FieldType fieldType1 = FieldType.newBuilder()
  70. .withName(ID_FIELD)
  71. .withDataType(DataType.Int64)
  72. .withPrimaryKey(true)
  73. .withAutoID(false)
  74. .build();
  75. FieldType fieldType2 = FieldType.newBuilder()
  76. .withName(VECTOR_FIELD)
  77. .withDataType(DataType.FloatVector)
  78. .withDimension(VECTOR_DIM)
  79. .build();
  80. FieldType fieldType3 = FieldType.newBuilder()
  81. .withName(AGE_FIELD)
  82. .withDataType(DataType.Int64)
  83. .build();
  84. CollectionSchemaParam collectionSchemaParam = CollectionSchemaParam.newBuilder()
  85. .withEnableDynamicField(false)
  86. .addFieldType(fieldType1)
  87. .addFieldType(fieldType2)
  88. .addFieldType(fieldType3)
  89. .build();
  90. CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder()
  91. .withCollectionName(COLLECTION_NAME)
  92. .withShardsNum(2)
  93. .withSchema(collectionSchemaParam)
  94. .withConsistencyLevel(ConsistencyLevelEnum.EVENTUALLY)
  95. .build();
  96. R<RpcStatus> response = milvusClient.withTimeout(timeoutMilliseconds, TimeUnit.MILLISECONDS)
  97. .createCollection(createCollectionReq);
  98. CommonUtils.handleResponseStatus(response);
  99. }
  100. private boolean hasCollection() {
  101. R<Boolean> response = milvusClient.hasCollection(HasCollectionParam.newBuilder()
  102. .withCollectionName(COLLECTION_NAME)
  103. .build());
  104. CommonUtils.handleResponseStatus(response);
  105. return response.getData();
  106. }
  107. private void dropCollection() {
  108. R<RpcStatus> response = milvusClient.dropCollection(DropCollectionParam.newBuilder()
  109. .withCollectionName(COLLECTION_NAME)
  110. .build());
  111. CommonUtils.handleResponseStatus(response);
  112. }
  113. private void loadCollection() {
  114. R<RpcStatus> response = milvusClient.loadCollection(LoadCollectionParam.newBuilder()
  115. .withCollectionName(COLLECTION_NAME)
  116. .build());
  117. CommonUtils.handleResponseStatus(response);
  118. System.out.printf("Finish Loading Collection %s\n", COLLECTION_NAME);
  119. }
  120. private void createIndex() {
  121. // create index for vector field
  122. R<RpcStatus> response = milvusClient.createIndex(CreateIndexParam.newBuilder()
  123. .withCollectionName(COLLECTION_NAME)
  124. .withFieldName(VECTOR_FIELD)
  125. .withIndexName(INDEX_NAME)
  126. .withIndexType(INDEX_TYPE)
  127. .withMetricType(MetricType.L2)
  128. .withExtraParam(INDEX_PARAM)
  129. .withSyncMode(Boolean.TRUE)
  130. .build());
  131. CommonUtils.handleResponseStatus(response);
  132. System.out.printf("Finish Creating index %s\n", INDEX_TYPE);
  133. }
  134. private void insertColumns() {
  135. int batchCount = 5;
  136. for (int batch = 0; batch < batchCount; ++batch) {
  137. List<List<Float>> vectors = CommonUtils.generateFixFloatVectors(VECTOR_DIM, NUM_ENTITIES);
  138. List<Long> ages = new ArrayList<>();
  139. List<Long> ids = new ArrayList<>();
  140. for (long i = 0L; i < NUM_ENTITIES; ++i) {
  141. ages.add((long) batch * NUM_ENTITIES + i);
  142. ids.add((long) batch * NUM_ENTITIES + i);
  143. }
  144. List<InsertParam.Field> fields = new ArrayList<>();
  145. fields.add(new InsertParam.Field(ID_FIELD, ids));
  146. fields.add(new InsertParam.Field(AGE_FIELD, ages));
  147. fields.add(new InsertParam.Field(VECTOR_FIELD, vectors));
  148. InsertParam insertParam = InsertParam.newBuilder()
  149. .withCollectionName(COLLECTION_NAME)
  150. .withFields(fields)
  151. .build();
  152. R<MutationResult> response = milvusClient.insert(insertParam);
  153. CommonUtils.handleResponseStatus(response);
  154. System.out.printf("Finish insert batch No.%d\n", batch);
  155. }
  156. GetCollectionStatisticsParam collectionStatisticsParam = GetCollectionStatisticsParam.newBuilder()
  157. .withCollectionName(COLLECTION_NAME)
  158. .withFlush(true)
  159. .build();
  160. R<GetCollectionStatisticsResponse> collectionStatistics = milvusClient.getCollectionStatistics(collectionStatisticsParam);
  161. CommonUtils.handleResponseStatus(collectionStatistics);
  162. GetCollStatResponseWrapper wrapper = new GetCollStatResponseWrapper(collectionStatistics.getData());
  163. System.out.printf("Number of entities in collection: %d\n", wrapper.getRowCount());
  164. }
  165. private void reCreateCollection() {
  166. if (hasCollection()) {
  167. if (CLEAR_EXIST) {
  168. dropCollection();
  169. System.out.printf("Dropped existed collection %s%n", COLLECTION_NAME);
  170. }
  171. } else {
  172. createCollection(2000);
  173. System.out.printf("Create collection %s%n", COLLECTION_NAME);
  174. }
  175. }
  176. private void prepareData() {
  177. insertColumns();
  178. createIndex();
  179. loadCollection();
  180. }
  181. private void queryIterateCollectionNoOffset() {
  182. String expr = String.format("10 <= %s <= 100", AGE_FIELD);
  183. QueryIterator queryIterator = getQueryIterator(expr, 0L, 5L, null);
  184. iterateQueryResult(queryIterator);
  185. }
  186. private void queryIterateCollectionWithOffset() {
  187. String expr = String.format("10 <= %s <= 100", AGE_FIELD);
  188. QueryIterator queryIterator = getQueryIterator(expr, 10L, 50L, null);
  189. iterateQueryResult(queryIterator);
  190. }
  191. private void queryIterateCollectionWithLimit() {
  192. String expr = String.format("10 <= %s <= 100", AGE_FIELD);
  193. QueryIterator queryIterator = getQueryIterator(expr, null, 80L, 530L);
  194. iterateQueryResult(queryIterator);
  195. }
  196. private void searchIteratorCollection() {
  197. List<List<Float>> floatVector = CommonUtils.generateFixFloatVectors(VECTOR_DIM, 1);
  198. String params = buildSearchParams();
  199. SearchIterator searchIterator = getSearchIterator(floatVector, 500L, null, params);
  200. iterateSearchResult(searchIterator);
  201. }
  202. private void searchIteratorCollectionWithLimit() {
  203. List<List<Float>> floatVector = CommonUtils.generateFixFloatVectors(VECTOR_DIM, 1);
  204. String params = buildSearchParams();
  205. SearchIterator searchIterator = getSearchIterator(floatVector, 200L, 755, params);
  206. iterateSearchResult(searchIterator);
  207. }
  208. private void iterateQueryResult(QueryIterator queryIterator) {
  209. int pageIdx = 0;
  210. while (true) {
  211. List<QueryResultsWrapper.RowRecord> res = queryIterator.next();
  212. if (res.isEmpty()) {
  213. System.out.println("query iteration finished, close");
  214. queryIterator.close();
  215. break;
  216. }
  217. for (QueryResultsWrapper.RowRecord re : res) {
  218. System.out.println(re);
  219. }
  220. pageIdx++;
  221. System.out.printf("page%s-------------------------%n", pageIdx);
  222. }
  223. }
  224. private void iterateSearchResult(SearchIterator searchIterator) {
  225. int pageIdx = 0;
  226. while (true) {
  227. List<QueryResultsWrapper.RowRecord> res = searchIterator.next();
  228. if (res.isEmpty()) {
  229. System.out.println("search iteration finished, close");
  230. searchIterator.close();
  231. break;
  232. }
  233. for (QueryResultsWrapper.RowRecord re : res) {
  234. System.out.println(re);
  235. }
  236. pageIdx++;
  237. System.out.printf("page%s-------------------------%n", pageIdx);
  238. }
  239. }
  240. private QueryIterator getQueryIterator(String expr, Long offset, Long batchSize, Long limit) {
  241. QueryIteratorParam.Builder queryIteratorParamBuilder = QueryIteratorParam.newBuilder()
  242. .withCollectionName(COLLECTION_NAME)
  243. .withExpr(expr).withOutFields(Lists.newArrayList(ID_FIELD, AGE_FIELD))
  244. .withBatchSize(batchSize).withConsistencyLevel(ConsistencyLevelEnum.EVENTUALLY);
  245. if (offset != null) {
  246. queryIteratorParamBuilder.withOffset(offset);
  247. }
  248. if (limit != null) {
  249. queryIteratorParamBuilder.withLimit(limit);
  250. }
  251. R<QueryIterator> response = milvusClient.queryIterator(queryIteratorParamBuilder.build());
  252. CommonUtils.handleResponseStatus(response);
  253. return response.getData();
  254. }
  255. private SearchIterator getSearchIterator(List<List<Float>> vectors, Long batchSize, Integer topK, String params) {
  256. SearchIteratorParam.Builder searchIteratorParamBuilder = SearchIteratorParam.newBuilder()
  257. .withCollectionName(COLLECTION_NAME)
  258. .withOutFields(Lists.newArrayList(ID_FIELD))
  259. .withBatchSize(batchSize)
  260. .withVectorFieldName(VECTOR_FIELD)
  261. .withVectors(vectors)
  262. .withParams(params)
  263. .withMetricType(MetricType.L2);
  264. if (topK != null) {
  265. searchIteratorParamBuilder.withTopK(topK);
  266. }
  267. R<SearchIterator> response = milvusClient.searchIterator(searchIteratorParamBuilder.build());
  268. CommonUtils.handleResponseStatus(response);
  269. return response.getData();
  270. }
  271. private String buildSearchParams() {
  272. return "{}";
  273. }
  274. public static void main(String[] args) {
  275. boolean skipDataPeriod = false;
  276. IteratorExample example = new IteratorExample();
  277. example.reCreateCollection();
  278. if (!skipDataPeriod) {
  279. example.prepareData();
  280. }
  281. example.queryIterateCollectionNoOffset();
  282. example.queryIterateCollectionWithOffset();
  283. example.queryIterateCollectionWithLimit();
  284. example.searchIteratorCollection();
  285. example.searchIteratorCollectionWithLimit();
  286. }
  287. }