MilvusGrpcClient.java 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package io.milvus.client;
  20. import com.google.common.util.concurrent.FutureCallback;
  21. import com.google.common.util.concurrent.Futures;
  22. import com.google.common.util.concurrent.ListenableFuture;
  23. import com.google.common.util.concurrent.MoreExecutors;
  24. import com.google.protobuf.ByteString;
  25. import io.grpc.ConnectivityState;
  26. import io.grpc.ManagedChannel;
  27. import io.grpc.ManagedChannelBuilder;
  28. import io.grpc.StatusRuntimeException;
  29. import io.milvus.grpc.*;
  30. import javax.annotation.Nonnull;
  31. import java.nio.Buffer;
  32. import java.nio.ByteBuffer;
  33. import java.util.ArrayList;
  34. import java.util.List;
  35. import java.util.concurrent.TimeUnit;
  36. import java.util.function.Function;
  37. import org.slf4j.Logger;
  38. import org.slf4j.LoggerFactory;
  39. /** Actual implementation of interface <code>MilvusClient</code> */
  40. public class MilvusGrpcClient implements MilvusClient {
  41. private static final Logger logger = LoggerFactory.getLogger(MilvusGrpcClient.class);
  42. private static final String ANSI_RESET = "\u001B[0m";
  43. private static final String ANSI_YELLOW = "\u001B[33m";
  44. private static final String ANSI_PURPLE = "\u001B[35m";
  45. private static final String ANSI_BRIGHT_PURPLE = "\u001B[95m";
  46. private final String extraParamKey = "params";
  47. private ManagedChannel channel = null;
  48. private MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub = null;
  49. private MilvusServiceGrpc.MilvusServiceFutureStub futureStub = null;
  50. ////////////////////// Constructor //////////////////////
  51. public MilvusGrpcClient() {}
  52. /////////////////////// Client Calls///////////////////////
  53. @Override
  54. public Response connect(ConnectParam connectParam) throws ConnectFailedException {
  55. if (channel != null && !(channel.isShutdown() || channel.isTerminated())) {
  56. logWarning("Channel is not shutdown or terminated");
  57. throw new ConnectFailedException("Channel is not shutdown or terminated");
  58. }
  59. try {
  60. channel =
  61. ManagedChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort())
  62. .usePlaintext()
  63. .maxInboundMessageSize(Integer.MAX_VALUE)
  64. .keepAliveTime(
  65. connectParam.getKeepAliveTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
  66. .keepAliveTimeout(
  67. connectParam.getKeepAliveTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
  68. .keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls())
  69. .idleTimeout(connectParam.getIdleTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
  70. .build();
  71. channel.getState(true);
  72. long timeout = connectParam.getConnectTimeout(TimeUnit.MILLISECONDS);
  73. logInfo("Trying to connect...Timeout in {} ms", timeout);
  74. final long checkFrequency = 100; // ms
  75. while (channel.getState(false) != ConnectivityState.READY) {
  76. if (timeout <= 0) {
  77. logError("Connect timeout!");
  78. throw new ConnectFailedException("Connect timeout");
  79. }
  80. TimeUnit.MILLISECONDS.sleep(checkFrequency);
  81. timeout -= checkFrequency;
  82. }
  83. blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
  84. futureStub = MilvusServiceGrpc.newFutureStub(channel);
  85. } catch (Exception e) {
  86. if (!(e instanceof ConnectFailedException)) {
  87. logError("Connect failed! {}", e.toString());
  88. }
  89. throw new ConnectFailedException("Exception occurred: " + e.toString());
  90. }
  91. logInfo(
  92. "Connection established successfully to host={}, port={}",
  93. connectParam.getHost(), String.valueOf(connectParam.getPort()));
  94. return new Response(Response.Status.SUCCESS);
  95. }
  96. @Override
  97. public boolean isConnected() {
  98. if (channel == null) {
  99. return false;
  100. }
  101. ConnectivityState connectivityState = channel.getState(false);
  102. return connectivityState == ConnectivityState.READY;
  103. }
  104. @Override
  105. public Response disconnect() throws InterruptedException {
  106. if (!channelIsReadyOrIdle()) {
  107. logWarning("You are not connected to Milvus server");
  108. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  109. } else {
  110. try {
  111. if (channel.shutdown().awaitTermination(60, TimeUnit.SECONDS)) {
  112. logInfo("Channel terminated");
  113. } else {
  114. logError("Encountered error when terminating channel");
  115. return new Response(Response.Status.RPC_ERROR);
  116. }
  117. } catch (InterruptedException e) {
  118. logError("Exception thrown when terminating channel: {}", e.toString());
  119. throw e;
  120. }
  121. }
  122. return new Response(Response.Status.SUCCESS);
  123. }
  124. @Override
  125. public Response createCollection(@Nonnull CollectionMapping collectionMapping) {
  126. if (!channelIsReadyOrIdle()) {
  127. logWarning("You are not connected to Milvus server");
  128. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  129. }
  130. CollectionSchema request =
  131. CollectionSchema.newBuilder()
  132. .setCollectionName(collectionMapping.getCollectionName())
  133. .setDimension(collectionMapping.getDimension())
  134. .setIndexFileSize(collectionMapping.getIndexFileSize())
  135. .setMetricType(collectionMapping.getMetricType().getVal())
  136. .build();
  137. Status response;
  138. try {
  139. response = blockingStub.createCollection(request);
  140. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  141. logInfo("Created collection successfully!\n{}", collectionMapping.toString());
  142. return new Response(Response.Status.SUCCESS);
  143. } else if (response.getReason().contentEquals("Collection already exists")) {
  144. logWarning("Collection `{}` already exists", collectionMapping.getCollectionName());
  145. return new Response(
  146. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  147. } else {
  148. logError(
  149. "Create collection failed\n{}\n{}",
  150. collectionMapping.toString(), response.toString());
  151. return new Response(
  152. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  153. }
  154. } catch (StatusRuntimeException e) {
  155. logError("createCollection RPC failed:\n{}", e.getStatus().toString());
  156. return new Response(Response.Status.RPC_ERROR, e.toString());
  157. }
  158. }
  159. @Override
  160. public HasCollectionResponse hasCollection(@Nonnull String collectionName) {
  161. if (!channelIsReadyOrIdle()) {
  162. logWarning("You are not connected to Milvus server");
  163. return new HasCollectionResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), false);
  164. }
  165. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  166. BoolReply response;
  167. try {
  168. response = blockingStub.hasCollection(request);
  169. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  170. logInfo("hasCollection `{}` = {}", collectionName, response.getBoolReply());
  171. return new HasCollectionResponse(
  172. new Response(Response.Status.SUCCESS), response.getBoolReply());
  173. } else {
  174. logError("hasCollection `{}` failed:\n{}", collectionName, response.toString());
  175. return new HasCollectionResponse(
  176. new Response(
  177. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  178. response.getStatus().getReason()),
  179. false);
  180. }
  181. } catch (StatusRuntimeException e) {
  182. logError("hasCollection RPC failed:\n{}", e.getStatus().toString());
  183. return new HasCollectionResponse(
  184. new Response(Response.Status.RPC_ERROR, e.toString()), false);
  185. }
  186. }
  187. @Override
  188. public Response dropCollection(@Nonnull String collectionName) {
  189. if (!channelIsReadyOrIdle()) {
  190. logWarning("You are not connected to Milvus server");
  191. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  192. }
  193. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  194. Status response;
  195. try {
  196. response = blockingStub.dropCollection(request);
  197. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  198. logInfo("Dropped collection `{}` successfully!", collectionName);
  199. return new Response(Response.Status.SUCCESS);
  200. } else {
  201. logError("Drop collection `{}` failed:\n{}", collectionName, response.toString());
  202. return new Response(
  203. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  204. }
  205. } catch (StatusRuntimeException e) {
  206. logError("dropCollection RPC failed:\n{}", e.getStatus().toString());
  207. return new Response(Response.Status.RPC_ERROR, e.toString());
  208. }
  209. }
  210. @Override
  211. public Response createIndex(@Nonnull Index index) {
  212. if (!channelIsReadyOrIdle()) {
  213. logWarning("You are not connected to Milvus server");
  214. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  215. }
  216. KeyValuePair extraParam =
  217. KeyValuePair.newBuilder().setKey(extraParamKey).setValue(index.getParamsInJson()).build();
  218. IndexParam request =
  219. IndexParam.newBuilder()
  220. .setCollectionName(index.getCollectionName())
  221. .setIndexType(index.getIndexType().getVal())
  222. .addExtraParams(extraParam)
  223. .build();
  224. Status response;
  225. try {
  226. response = blockingStub.createIndex(request);
  227. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  228. logInfo("Created index successfully!\n{}", index.toString());
  229. return new Response(Response.Status.SUCCESS);
  230. } else {
  231. logError("Create index failed:\n{}\n{}", index.toString(), response.toString());
  232. return new Response(
  233. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  234. }
  235. } catch (StatusRuntimeException e) {
  236. logError("createIndex RPC failed:\n{}", e.getStatus().toString());
  237. return new Response(Response.Status.RPC_ERROR, e.toString());
  238. }
  239. }
  240. @Override
  241. public ListenableFuture<Response> createIndexAsync(@Nonnull Index index) {
  242. if (!channelIsReadyOrIdle()) {
  243. logWarning("You are not connected to Milvus server");
  244. return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
  245. }
  246. KeyValuePair extraParam =
  247. KeyValuePair.newBuilder().setKey(extraParamKey).setValue(index.getParamsInJson()).build();
  248. IndexParam request =
  249. IndexParam.newBuilder()
  250. .setCollectionName(index.getCollectionName())
  251. .setIndexType(index.getIndexType().getVal())
  252. .addExtraParams(extraParam)
  253. .build();
  254. ListenableFuture<Status> response;
  255. response = futureStub.createIndex(request);
  256. Futures.addCallback(
  257. response,
  258. new FutureCallback<Status>() {
  259. @Override
  260. public void onSuccess(Status result) {
  261. if (result.getErrorCode() == ErrorCode.SUCCESS) {
  262. logInfo("Created index successfully!\n{}", index.toString());
  263. } else {
  264. logError("CreateIndexAsync failed:\n{}\n{}", index.toString(), result.toString());
  265. }
  266. }
  267. @Override
  268. public void onFailure(Throwable t) {
  269. logError("CreateIndexAsync failed:\n{}", t.getMessage());
  270. }
  271. },
  272. MoreExecutors.directExecutor());
  273. return Futures.transform(
  274. response, transformStatusToResponseFunc::apply, MoreExecutors.directExecutor());
  275. }
  276. @Override
  277. public Response createPartition(String collectionName, String tag) {
  278. if (!channelIsReadyOrIdle()) {
  279. logWarning("You are not connected to Milvus server");
  280. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  281. }
  282. PartitionParam request =
  283. PartitionParam.newBuilder().setCollectionName(collectionName).setTag(tag).build();
  284. Status response;
  285. try {
  286. response = blockingStub.createPartition(request);
  287. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  288. logInfo("Created partition `{}` in collection `{}` successfully!", tag, collectionName);
  289. return new Response(Response.Status.SUCCESS);
  290. } else {
  291. logError(
  292. "Create partition `{}` in collection `{}` failed: {}",
  293. tag, collectionName, response.toString());
  294. return new Response(
  295. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  296. }
  297. } catch (StatusRuntimeException e) {
  298. logError("createPartition RPC failed:\n{}", e.getStatus().toString());
  299. return new Response(Response.Status.RPC_ERROR, e.toString());
  300. }
  301. }
  302. @Override
  303. public HasPartitionResponse hasPartition(String collectionName, String tag) {
  304. if (!channelIsReadyOrIdle()) {
  305. logWarning("You are not connected to Milvus server");
  306. return new HasPartitionResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), false);
  307. }
  308. PartitionParam request =
  309. PartitionParam.newBuilder().setCollectionName(collectionName).setTag(tag).build();
  310. BoolReply response;
  311. try {
  312. response = blockingStub.hasPartition(request);
  313. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  314. logInfo("hasPartition with tag `{}` in `{}` = {}", tag, collectionName, response.getBoolReply());
  315. return new HasPartitionResponse(
  316. new Response(Response.Status.SUCCESS), response.getBoolReply());
  317. } else {
  318. logError("hasPartition with tag `{}` in `{}` failed:\n{}", tag, collectionName, response.toString());
  319. return new HasPartitionResponse(
  320. new Response(
  321. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  322. response.getStatus().getReason()),
  323. false);
  324. }
  325. } catch (StatusRuntimeException e) {
  326. logError("hasPartition RPC failed:\n{}", e.getStatus().toString());
  327. return new HasPartitionResponse(
  328. new Response(Response.Status.RPC_ERROR, e.toString()), false);
  329. }
  330. }
  331. @Override
  332. public ShowPartitionsResponse showPartitions(String collectionName) {
  333. if (!channelIsReadyOrIdle()) {
  334. logWarning("You are not connected to Milvus server");
  335. return new ShowPartitionsResponse(
  336. new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
  337. }
  338. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  339. PartitionList response;
  340. try {
  341. response = blockingStub.showPartitions(request);
  342. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  343. logInfo(
  344. "Current partitions of collection {}: {}",
  345. collectionName, response.getPartitionTagArrayList());
  346. return new ShowPartitionsResponse(
  347. new Response(Response.Status.SUCCESS), response.getPartitionTagArrayList());
  348. } else {
  349. logError("Show partitions failed:\n{}", response.toString());
  350. return new ShowPartitionsResponse(
  351. new Response(
  352. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  353. response.getStatus().getReason()),
  354. new ArrayList<>());
  355. }
  356. } catch (StatusRuntimeException e) {
  357. logError("showPartitions RPC failed:\n{}", e.getStatus().toString());
  358. return new ShowPartitionsResponse(
  359. new Response(Response.Status.RPC_ERROR, e.toString()), new ArrayList<>());
  360. }
  361. }
  362. @Override
  363. public Response dropPartition(String collectionName, String tag) {
  364. if (!channelIsReadyOrIdle()) {
  365. logWarning("You are not connected to Milvus server");
  366. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  367. }
  368. PartitionParam request =
  369. PartitionParam.newBuilder().setCollectionName(collectionName).setTag(tag).build();
  370. Status response;
  371. try {
  372. response = blockingStub.dropPartition(request);
  373. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  374. logInfo("Dropped partition `{}` in collection `{}` successfully!", tag, collectionName);
  375. return new Response(Response.Status.SUCCESS);
  376. } else {
  377. logError(
  378. "Drop partition `{}` in collection `{}` failed:\n{}",
  379. tag, collectionName, response.toString());
  380. return new Response(
  381. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  382. }
  383. } catch (StatusRuntimeException e) {
  384. logError("dropPartition RPC failed:\n{}", e.getStatus().toString());
  385. return new Response(Response.Status.RPC_ERROR, e.toString());
  386. }
  387. }
  388. @Override
  389. public InsertResponse insert(@Nonnull InsertParam insertParam) {
  390. if (!channelIsReadyOrIdle()) {
  391. logWarning("You are not connected to Milvus server");
  392. return new InsertResponse(
  393. new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
  394. }
  395. List<RowRecord> rowRecordList =
  396. buildRowRecordList(insertParam.getFloatVectors(), insertParam.getBinaryVectors());
  397. io.milvus.grpc.InsertParam request =
  398. io.milvus.grpc.InsertParam.newBuilder()
  399. .setCollectionName(insertParam.getCollectionName())
  400. .addAllRowRecordArray(rowRecordList)
  401. .addAllRowIdArray(insertParam.getVectorIds())
  402. .setPartitionTag(insertParam.getPartitionTag())
  403. .build();
  404. VectorIds response;
  405. try {
  406. response = blockingStub.insert(request);
  407. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  408. logInfo(
  409. "Inserted {} vectors to collection `{}` successfully!",
  410. response.getVectorIdArrayCount(), insertParam.getCollectionName());
  411. return new InsertResponse(
  412. new Response(Response.Status.SUCCESS), response.getVectorIdArrayList());
  413. } else {
  414. logError("Insert vectors failed:\n{}", response.getStatus().toString());
  415. return new InsertResponse(
  416. new Response(
  417. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  418. response.getStatus().getReason()),
  419. new ArrayList<>());
  420. }
  421. } catch (StatusRuntimeException e) {
  422. logError("insert RPC failed:\n{}", e.getStatus().toString());
  423. return new InsertResponse(
  424. new Response(Response.Status.RPC_ERROR, e.toString()), new ArrayList<>());
  425. }
  426. }
  427. @Override
  428. public ListenableFuture<InsertResponse> insertAsync(@Nonnull InsertParam insertParam) {
  429. if (!channelIsReadyOrIdle()) {
  430. logWarning("You are not connected to Milvus server");
  431. return Futures.immediateFuture(
  432. new InsertResponse(
  433. new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>()));
  434. }
  435. List<RowRecord> rowRecordList =
  436. buildRowRecordList(insertParam.getFloatVectors(), insertParam.getBinaryVectors());
  437. io.milvus.grpc.InsertParam request =
  438. io.milvus.grpc.InsertParam.newBuilder()
  439. .setCollectionName(insertParam.getCollectionName())
  440. .addAllRowRecordArray(rowRecordList)
  441. .addAllRowIdArray(insertParam.getVectorIds())
  442. .setPartitionTag(insertParam.getPartitionTag())
  443. .build();
  444. ListenableFuture<VectorIds> response;
  445. response = futureStub.insert(request);
  446. Futures.addCallback(
  447. response,
  448. new FutureCallback<VectorIds>() {
  449. @Override
  450. public void onSuccess(VectorIds result) {
  451. if (result.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  452. logInfo(
  453. "Inserted {} vectors to collection `{}` successfully!",
  454. result.getVectorIdArrayCount(), insertParam.getCollectionName());
  455. } else {
  456. logError("InsertAsync failed:\n{}", result.getStatus().toString());
  457. }
  458. }
  459. @Override
  460. public void onFailure(Throwable t) {
  461. logError("InsertAsync failed:\n{}", t.getMessage());
  462. }
  463. },
  464. MoreExecutors.directExecutor());
  465. Function<VectorIds, InsertResponse> transformFunc =
  466. vectorIds -> {
  467. if (vectorIds.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  468. return new InsertResponse(
  469. new Response(Response.Status.SUCCESS), vectorIds.getVectorIdArrayList());
  470. } else {
  471. return new InsertResponse(
  472. new Response(
  473. Response.Status.valueOf(vectorIds.getStatus().getErrorCodeValue()),
  474. vectorIds.getStatus().getReason()),
  475. new ArrayList<>());
  476. }
  477. };
  478. return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
  479. }
  480. @Override
  481. public SearchResponse search(@Nonnull SearchParam searchParam) {
  482. if (!channelIsReadyOrIdle()) {
  483. logWarning("You are not connected to Milvus server");
  484. SearchResponse searchResponse = new SearchResponse();
  485. searchResponse.setResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED));
  486. return searchResponse;
  487. }
  488. List<RowRecord> rowRecordList =
  489. buildRowRecordList(searchParam.getFloatVectors(), searchParam.getBinaryVectors());
  490. KeyValuePair extraParam =
  491. KeyValuePair.newBuilder()
  492. .setKey(extraParamKey)
  493. .setValue(searchParam.getParamsInJson())
  494. .build();
  495. io.milvus.grpc.SearchParam request =
  496. io.milvus.grpc.SearchParam.newBuilder()
  497. .setCollectionName(searchParam.getCollectionName())
  498. .addAllQueryRecordArray(rowRecordList)
  499. .addAllPartitionTagArray(searchParam.getPartitionTags())
  500. .setTopk(searchParam.getTopK())
  501. .addExtraParams(extraParam)
  502. .build();
  503. TopKQueryResult response;
  504. try {
  505. response = blockingStub.search(request);
  506. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  507. SearchResponse searchResponse = buildSearchResponse(response);
  508. searchResponse.setResponse(new Response(Response.Status.SUCCESS));
  509. logInfo(
  510. "Search completed successfully! Returned results for {} queries",
  511. searchResponse.getNumQueries());
  512. return searchResponse;
  513. } else {
  514. logError("Search failed:\n{}", response.getStatus().toString());
  515. SearchResponse searchResponse = new SearchResponse();
  516. searchResponse.setResponse(
  517. new Response(
  518. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  519. response.getStatus().getReason()));
  520. return searchResponse;
  521. }
  522. } catch (StatusRuntimeException e) {
  523. logError("search RPC failed:\n{}", e.getStatus().toString());
  524. SearchResponse searchResponse = new SearchResponse();
  525. searchResponse.setResponse(new Response(Response.Status.RPC_ERROR, e.toString()));
  526. return searchResponse;
  527. }
  528. }
  529. @Override
  530. public SearchResponse searchByIds(@Nonnull SearchByIdsParam searchByIdsParam) {
  531. if (!channelIsReadyOrIdle()) {
  532. logWarning("You are not connected to Milvus server");
  533. SearchResponse searchResponse = new SearchResponse();
  534. searchResponse.setResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED));
  535. return searchResponse;
  536. }
  537. List<Long> idList = searchByIdsParam.getIds();
  538. KeyValuePair extraParam =
  539. KeyValuePair.newBuilder()
  540. .setKey(extraParamKey)
  541. .setValue(searchByIdsParam.getParamsInJson())
  542. .build();
  543. io.milvus.grpc.SearchByIDParam request =
  544. io.milvus.grpc.SearchByIDParam.newBuilder()
  545. .setCollectionName(searchByIdsParam.getCollectionName())
  546. .addAllIdArray(idList)
  547. .addAllPartitionTagArray(searchByIdsParam.getPartitionTags())
  548. .setTopk(searchByIdsParam.getTopK())
  549. .addExtraParams(extraParam)
  550. .build();
  551. TopKQueryResult response;
  552. try {
  553. response = blockingStub.searchByID(request);
  554. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  555. SearchResponse searchResponse = buildSearchResponse(response);
  556. searchResponse.setResponse(new Response(Response.Status.SUCCESS));
  557. logInfo(
  558. "Search by ids completed successfully! Returned results for {} queries",
  559. searchResponse.getNumQueries());
  560. return searchResponse;
  561. } else {
  562. logError("Search by ids failed:\n{}", response.getStatus().toString());
  563. SearchResponse searchResponse = new SearchResponse();
  564. searchResponse.setResponse(
  565. new Response(
  566. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  567. response.getStatus().getReason()));
  568. return searchResponse;
  569. }
  570. } catch (StatusRuntimeException e) {
  571. logError("search by ids RPC failed:\n{}", e.getStatus().toString());
  572. SearchResponse searchResponse = new SearchResponse();
  573. searchResponse.setResponse(new Response(Response.Status.RPC_ERROR, e.toString()));
  574. return searchResponse;
  575. }
  576. }
  577. @Override
  578. public ListenableFuture<SearchResponse> searchAsync(@Nonnull SearchParam searchParam) {
  579. if (!channelIsReadyOrIdle()) {
  580. logWarning("You are not connected to Milvus server");
  581. SearchResponse searchResponse = new SearchResponse();
  582. searchResponse.setResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED));
  583. return Futures.immediateFuture(searchResponse);
  584. }
  585. List<RowRecord> rowRecordList =
  586. buildRowRecordList(searchParam.getFloatVectors(), searchParam.getBinaryVectors());
  587. KeyValuePair extraParam =
  588. KeyValuePair.newBuilder()
  589. .setKey(extraParamKey)
  590. .setValue(searchParam.getParamsInJson())
  591. .build();
  592. io.milvus.grpc.SearchParam request =
  593. io.milvus.grpc.SearchParam.newBuilder()
  594. .setCollectionName(searchParam.getCollectionName())
  595. .addAllQueryRecordArray(rowRecordList)
  596. .addAllPartitionTagArray(searchParam.getPartitionTags())
  597. .setTopk(searchParam.getTopK())
  598. .addExtraParams(extraParam)
  599. .build();
  600. ListenableFuture<TopKQueryResult> response;
  601. response = futureStub.search(request);
  602. Futures.addCallback(
  603. response,
  604. new FutureCallback<TopKQueryResult>() {
  605. @Override
  606. public void onSuccess(TopKQueryResult result) {
  607. if (result.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  608. logInfo(
  609. "SearchAsync completed successfully! Returned results for {} queries",
  610. result.getRowNum());
  611. } else {
  612. logError("SearchAsync failed:\n{}", result.getStatus().toString());
  613. }
  614. }
  615. @Override
  616. public void onFailure(Throwable t) {
  617. logError("SearchAsync failed:\n{}", t.getMessage());
  618. }
  619. },
  620. MoreExecutors.directExecutor());
  621. Function<TopKQueryResult, SearchResponse> transformFunc =
  622. topKQueryResult -> {
  623. if (topKQueryResult.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  624. SearchResponse searchResponse = buildSearchResponse(topKQueryResult);
  625. searchResponse.setResponse(new Response(Response.Status.SUCCESS));
  626. return searchResponse;
  627. } else {
  628. SearchResponse searchResponse = new SearchResponse();
  629. searchResponse.setResponse(
  630. new Response(
  631. Response.Status.valueOf(topKQueryResult.getStatus().getErrorCodeValue()),
  632. topKQueryResult.getStatus().getReason()));
  633. return searchResponse;
  634. }
  635. };
  636. return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
  637. }
  638. @Override
  639. public SearchResponse searchInFiles(
  640. @Nonnull List<String> fileIds, @Nonnull SearchParam searchParam) {
  641. if (!channelIsReadyOrIdle()) {
  642. logWarning("You are not connected to Milvus server");
  643. SearchResponse searchResponse = new SearchResponse();
  644. searchResponse.setResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED));
  645. return searchResponse;
  646. }
  647. List<RowRecord> rowRecordList =
  648. buildRowRecordList(searchParam.getFloatVectors(), searchParam.getBinaryVectors());
  649. KeyValuePair extraParam =
  650. KeyValuePair.newBuilder()
  651. .setKey(extraParamKey)
  652. .setValue(searchParam.getParamsInJson())
  653. .build();
  654. io.milvus.grpc.SearchParam constructSearchParam =
  655. io.milvus.grpc.SearchParam.newBuilder()
  656. .setCollectionName(searchParam.getCollectionName())
  657. .addAllQueryRecordArray(rowRecordList)
  658. .addAllPartitionTagArray(searchParam.getPartitionTags())
  659. .setTopk(searchParam.getTopK())
  660. .addExtraParams(extraParam)
  661. .build();
  662. SearchInFilesParam request =
  663. SearchInFilesParam.newBuilder()
  664. .addAllFileIdArray(fileIds)
  665. .setSearchParam(constructSearchParam)
  666. .build();
  667. TopKQueryResult response;
  668. try {
  669. response = blockingStub.searchInFiles(request);
  670. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  671. SearchResponse searchResponse = buildSearchResponse(response);
  672. searchResponse.setResponse(new Response(Response.Status.SUCCESS));
  673. logInfo(
  674. "Search in files completed successfully! Returned results for {} queries",
  675. searchResponse.getNumQueries());
  676. return searchResponse;
  677. } else {
  678. logError("Search in files failed: {}", response.getStatus().toString());
  679. SearchResponse searchResponse = new SearchResponse();
  680. searchResponse.setResponse(
  681. new Response(
  682. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  683. response.getStatus().getReason()));
  684. return searchResponse;
  685. }
  686. } catch (StatusRuntimeException e) {
  687. logError("searchInFiles RPC failed:\n{}", e.getStatus().toString());
  688. SearchResponse searchResponse = new SearchResponse();
  689. searchResponse.setResponse(new Response(Response.Status.RPC_ERROR, e.toString()));
  690. return searchResponse;
  691. }
  692. }
  693. @Override
  694. public DescribeCollectionResponse describeCollection(@Nonnull String collectionName) {
  695. if (!channelIsReadyOrIdle()) {
  696. logWarning("You are not connected to Milvus server");
  697. return new DescribeCollectionResponse(
  698. new Response(Response.Status.CLIENT_NOT_CONNECTED), null);
  699. }
  700. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  701. CollectionSchema response;
  702. try {
  703. response = blockingStub.describeCollection(request);
  704. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  705. CollectionMapping collectionMapping =
  706. new CollectionMapping.Builder(response.getCollectionName(), response.getDimension())
  707. .withIndexFileSize(response.getIndexFileSize())
  708. .withMetricType(MetricType.valueOf(response.getMetricType()))
  709. .build();
  710. logInfo("Describe Collection `{}` returned:\n{}", collectionName, collectionMapping);
  711. return new DescribeCollectionResponse(
  712. new Response(Response.Status.SUCCESS), collectionMapping);
  713. } else {
  714. logError(
  715. "Describe Collection `{}` failed:\n{}",
  716. collectionName, response.getStatus().toString());
  717. return new DescribeCollectionResponse(
  718. new Response(
  719. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  720. response.getStatus().getReason()),
  721. null);
  722. }
  723. } catch (StatusRuntimeException e) {
  724. logError("describeCollection RPC failed:\n{}", e.getStatus().toString());
  725. return new DescribeCollectionResponse(
  726. new Response(Response.Status.RPC_ERROR, e.toString()), null);
  727. }
  728. }
  729. @Override
  730. public ShowCollectionsResponse showCollections() {
  731. if (!channelIsReadyOrIdle()) {
  732. logWarning("You are not connected to Milvus server");
  733. return new ShowCollectionsResponse(
  734. new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
  735. }
  736. Command request = Command.newBuilder().setCmd("").build();
  737. CollectionNameList response;
  738. try {
  739. response = blockingStub.showCollections(request);
  740. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  741. List<String> collectionNames = response.getCollectionNamesList();
  742. logInfo("Current collections: {}", collectionNames.toString());
  743. return new ShowCollectionsResponse(new Response(Response.Status.SUCCESS), collectionNames);
  744. } else {
  745. logError("Show collections failed:\n{}", response.getStatus().toString());
  746. return new ShowCollectionsResponse(
  747. new Response(
  748. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  749. response.getStatus().getReason()),
  750. new ArrayList<>());
  751. }
  752. } catch (StatusRuntimeException e) {
  753. logError("showCollections RPC failed:\n{}", e.getStatus().toString());
  754. return new ShowCollectionsResponse(
  755. new Response(Response.Status.RPC_ERROR, e.toString()), new ArrayList<>());
  756. }
  757. }
  758. @Override
  759. public GetCollectionRowCountResponse getCollectionRowCount(@Nonnull String collectionName) {
  760. if (!channelIsReadyOrIdle()) {
  761. logWarning("You are not connected to Milvus server");
  762. return new GetCollectionRowCountResponse(
  763. new Response(Response.Status.CLIENT_NOT_CONNECTED), 0);
  764. }
  765. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  766. CollectionRowCount response;
  767. try {
  768. response = blockingStub.countCollection(request);
  769. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  770. long collectionRowCount = response.getCollectionRowCount();
  771. logInfo("Collection `{}` has {} rows", collectionName, collectionRowCount);
  772. return new GetCollectionRowCountResponse(
  773. new Response(Response.Status.SUCCESS), collectionRowCount);
  774. } else {
  775. logError(
  776. "Get collection `{}` row count failed:\n{}",
  777. collectionName, response.getStatus().toString());
  778. return new GetCollectionRowCountResponse(
  779. new Response(
  780. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  781. response.getStatus().getReason()),
  782. 0);
  783. }
  784. } catch (StatusRuntimeException e) {
  785. logError("countCollection RPC failed:\n{}", e.getStatus().toString());
  786. return new GetCollectionRowCountResponse(
  787. new Response(Response.Status.RPC_ERROR, e.toString()), 0);
  788. }
  789. }
  790. @Override
  791. public Response getServerStatus() {
  792. return command("status");
  793. }
  794. @Override
  795. public Response getServerVersion() {
  796. return command("version");
  797. }
  798. public Response command(@Nonnull String command) {
  799. if (!channelIsReadyOrIdle()) {
  800. logWarning("You are not connected to Milvus server");
  801. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  802. }
  803. Command request = Command.newBuilder().setCmd(command).build();
  804. StringReply response;
  805. try {
  806. response = blockingStub.cmd(request);
  807. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  808. logInfo("Command `{}`: {}", command, response.getStringReply());
  809. return new Response(Response.Status.SUCCESS, response.getStringReply());
  810. } else {
  811. logError("Command `{}` failed:\n{}", command, response.toString());
  812. return new Response(
  813. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  814. response.getStatus().getReason());
  815. }
  816. } catch (StatusRuntimeException e) {
  817. logError("Command RPC failed:\n{}", e.getStatus().toString());
  818. return new Response(Response.Status.RPC_ERROR, e.toString());
  819. }
  820. }
  821. @Override
  822. public Response preloadCollection(@Nonnull String collectionName) {
  823. if (!channelIsReadyOrIdle()) {
  824. logWarning("You are not connected to Milvus server");
  825. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  826. }
  827. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  828. Status response;
  829. try {
  830. response = blockingStub.preloadCollection(request);
  831. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  832. logInfo("Preloaded collection `{}` successfully!", collectionName);
  833. return new Response(Response.Status.SUCCESS);
  834. } else {
  835. logError("Preload collection `{}` failed:\n{}", collectionName, response.toString());
  836. return new Response(
  837. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  838. }
  839. } catch (StatusRuntimeException e) {
  840. logError("preloadCollection RPC failed:\n{}", e.getStatus().toString());
  841. return new Response(Response.Status.RPC_ERROR, e.toString());
  842. }
  843. }
  844. @Override
  845. public DescribeIndexResponse describeIndex(@Nonnull String collectionName) {
  846. if (!channelIsReadyOrIdle()) {
  847. logWarning("You are not connected to Milvus server");
  848. return new DescribeIndexResponse(new Response(Response.Status.CLIENT_NOT_CONNECTED), null);
  849. }
  850. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  851. IndexParam response;
  852. try {
  853. response = blockingStub.describeIndex(request);
  854. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  855. String extraParam = "";
  856. for (KeyValuePair kv : response.getExtraParamsList()) {
  857. if (kv.getKey().contentEquals(extraParamKey)) {
  858. extraParam = kv.getValue();
  859. }
  860. }
  861. Index index =
  862. new Index.Builder(response.getCollectionName(), IndexType.valueOf(response.getIndexType()))
  863. .withParamsInJson(extraParam)
  864. .build();
  865. logInfo(
  866. "Describe index for collection `{}` returned:\n{}", collectionName, index.toString());
  867. return new DescribeIndexResponse(new Response(Response.Status.SUCCESS), index);
  868. } else {
  869. logError(
  870. "Describe index for collection `{}` failed:\n{}",
  871. collectionName, response.getStatus().toString());
  872. return new DescribeIndexResponse(
  873. new Response(
  874. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  875. response.getStatus().getReason()),
  876. null);
  877. }
  878. } catch (StatusRuntimeException e) {
  879. logError("describeIndex RPC failed:\n{}", e.getStatus().toString());
  880. return new DescribeIndexResponse(new Response(Response.Status.RPC_ERROR, e.toString()), null);
  881. }
  882. }
  883. @Override
  884. public Response dropIndex(@Nonnull String collectionName) {
  885. if (!channelIsReadyOrIdle()) {
  886. logWarning("You are not connected to Milvus server");
  887. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  888. }
  889. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  890. Status response;
  891. try {
  892. response = blockingStub.dropIndex(request);
  893. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  894. logInfo("Dropped index for collection `{}` successfully!", collectionName);
  895. return new Response(Response.Status.SUCCESS);
  896. } else {
  897. logError(
  898. "Drop index for collection `{}` failed:\n{}", collectionName, response.toString());
  899. return new Response(
  900. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  901. }
  902. } catch (StatusRuntimeException e) {
  903. logError("dropIndex RPC failed:\n{}", e.getStatus().toString());
  904. return new Response(Response.Status.RPC_ERROR, e.toString());
  905. }
  906. }
  907. @Override
  908. public Response showCollectionInfo(String collectionName) {
  909. if (!channelIsReadyOrIdle()) {
  910. logWarning("You are not connected to Milvus server");
  911. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  912. }
  913. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  914. io.milvus.grpc.CollectionInfo response;
  915. try {
  916. response = blockingStub.showCollectionInfo(request);
  917. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  918. logInfo("ShowCollectionInfo for `{}` returned successfully!", collectionName);
  919. return new Response(Response.Status.SUCCESS, response.getJsonInfo());
  920. } else {
  921. logError(
  922. "ShowCollectionInfo for `{}` failed:\n{}",
  923. collectionName, response.getStatus().toString());
  924. return new Response(
  925. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  926. response.getStatus().getReason());
  927. }
  928. } catch (StatusRuntimeException e) {
  929. logError("showCollectionInfo RPC failed:\n{}", e.getStatus().toString());
  930. return new Response(Response.Status.RPC_ERROR, e.toString());
  931. }
  932. }
  933. @Override
  934. public GetVectorsByIdsResponse getVectorsByIds(String collectionName, List<Long> ids) {
  935. if (!channelIsReadyOrIdle()) {
  936. logWarning("You are not connected to Milvus server");
  937. return new GetVectorsByIdsResponse(
  938. new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>(), null);
  939. }
  940. VectorsIdentity request =
  941. VectorsIdentity.newBuilder().setCollectionName(collectionName).addAllIdArray(ids).build();
  942. VectorsData response;
  943. try {
  944. response = blockingStub.getVectorsByID(request);
  945. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  946. logInfo(
  947. "getVectorsByIds in collection `{}` returned successfully!", collectionName);
  948. List<List<Float>> floatVectors = new ArrayList<>();
  949. List<ByteBuffer> binaryVectors = new ArrayList<>();
  950. for (int i = 0; i < ids.size(); i++) {
  951. floatVectors.add(response.getVectorsData(i).getFloatDataList());
  952. binaryVectors.add(response.getVectorsData(i).getBinaryData().asReadOnlyByteBuffer());
  953. }
  954. return new GetVectorsByIdsResponse(
  955. new Response(Response.Status.SUCCESS), floatVectors, binaryVectors);
  956. } else {
  957. logError(
  958. "getVectorsByIds in collection `{}` failed:\n{}",
  959. collectionName, response.getStatus().toString());
  960. return new GetVectorsByIdsResponse(
  961. new Response(
  962. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  963. response.getStatus().getReason()),
  964. new ArrayList<>(),
  965. null);
  966. }
  967. } catch (StatusRuntimeException e) {
  968. logError("getVectorsByIds RPC failed:\n{}", e.getStatus().toString());
  969. return new GetVectorsByIdsResponse(
  970. new Response(Response.Status.RPC_ERROR, e.toString()), new ArrayList<>(), null);
  971. }
  972. }
  973. @Override
  974. public GetVectorIdsResponse getVectorIds(String collectionName, String segmentName) {
  975. if (!channelIsReadyOrIdle()) {
  976. logWarning("You are not connected to Milvus server");
  977. return new GetVectorIdsResponse(
  978. new Response(Response.Status.CLIENT_NOT_CONNECTED), new ArrayList<>());
  979. }
  980. GetVectorIDsParam request =
  981. GetVectorIDsParam.newBuilder()
  982. .setCollectionName(collectionName)
  983. .setSegmentName(segmentName)
  984. .build();
  985. VectorIds response;
  986. try {
  987. response = blockingStub.getVectorIDs(request);
  988. if (response.getStatus().getErrorCode() == ErrorCode.SUCCESS) {
  989. logInfo(
  990. "getVectorIds in collection `{}`, segment `{}` returned successfully!",
  991. collectionName, segmentName);
  992. return new GetVectorIdsResponse(
  993. new Response(Response.Status.SUCCESS), response.getVectorIdArrayList());
  994. } else {
  995. logError(
  996. "getVectorIds in collection `{}`, segment `{}` failed:\n{}",
  997. collectionName, segmentName, response.getStatus().toString());
  998. return new GetVectorIdsResponse(
  999. new Response(
  1000. Response.Status.valueOf(response.getStatus().getErrorCodeValue()),
  1001. response.getStatus().getReason()),
  1002. new ArrayList<>());
  1003. }
  1004. } catch (StatusRuntimeException e) {
  1005. logError("getVectorIds RPC failed:\n{}", e.getStatus().toString());
  1006. return new GetVectorIdsResponse(
  1007. new Response(Response.Status.RPC_ERROR, e.toString()), new ArrayList<>());
  1008. }
  1009. }
  1010. @Override
  1011. public Response deleteByIds(String collectionName, List<Long> ids) {
  1012. if (!channelIsReadyOrIdle()) {
  1013. logWarning("You are not connected to Milvus server");
  1014. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  1015. }
  1016. DeleteByIDParam request =
  1017. DeleteByIDParam.newBuilder().setCollectionName(collectionName).addAllIdArray(ids).build();
  1018. Status response;
  1019. try {
  1020. response = blockingStub.deleteByID(request);
  1021. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  1022. logInfo("deleteByIds in collection `{}` completed successfully!", collectionName);
  1023. return new Response(Response.Status.SUCCESS);
  1024. } else {
  1025. logError(
  1026. "deleteByIds in collection `{}` failed:\n{}", collectionName, response.toString());
  1027. return new Response(
  1028. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  1029. }
  1030. } catch (StatusRuntimeException e) {
  1031. logError("deleteByIds RPC failed:\n{}", e.getStatus().toString());
  1032. return new Response(Response.Status.RPC_ERROR, e.toString());
  1033. }
  1034. }
  1035. @Override
  1036. public Response deleteById(String collectionName, Long id) {
  1037. List<Long> list =
  1038. new ArrayList<Long>() {
  1039. {
  1040. add(id);
  1041. }
  1042. };
  1043. return deleteByIds(collectionName, list);
  1044. }
  1045. @Override
  1046. public Response flush(List<String> collectionNames) {
  1047. if (!channelIsReadyOrIdle()) {
  1048. logWarning("You are not connected to Milvus server");
  1049. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  1050. }
  1051. FlushParam request = FlushParam.newBuilder().addAllCollectionNameArray(collectionNames).build();
  1052. Status response;
  1053. try {
  1054. response = blockingStub.flush(request);
  1055. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  1056. logInfo("Flushed collection {} successfully!", collectionNames);
  1057. return new Response(Response.Status.SUCCESS);
  1058. } else {
  1059. logError("Flush collection {} failed:\n{}", collectionNames, response.toString());
  1060. return new Response(
  1061. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  1062. }
  1063. } catch (StatusRuntimeException e) {
  1064. logError("flush RPC failed:\n{}", e.getStatus().toString());
  1065. return new Response(Response.Status.RPC_ERROR, e.toString());
  1066. }
  1067. }
  1068. @Override
  1069. public ListenableFuture<Response> flushAsync(@Nonnull List<String> collectionNames) {
  1070. if (!channelIsReadyOrIdle()) {
  1071. logWarning("You are not connected to Milvus server");
  1072. return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
  1073. }
  1074. FlushParam request = FlushParam.newBuilder().addAllCollectionNameArray(collectionNames).build();
  1075. ListenableFuture<Status> response;
  1076. response = futureStub.flush(request);
  1077. Futures.addCallback(
  1078. response,
  1079. new FutureCallback<Status>() {
  1080. @Override
  1081. public void onSuccess(Status result) {
  1082. if (result.getErrorCode() == ErrorCode.SUCCESS) {
  1083. logInfo("Flushed collection {} successfully!", collectionNames);
  1084. } else {
  1085. logError("Flush collection {} failed:\n{}", collectionNames, result.toString());
  1086. }
  1087. }
  1088. @Override
  1089. public void onFailure(Throwable t) {
  1090. logError("FlushAsync failed:\n{}", t.getMessage());
  1091. }
  1092. },
  1093. MoreExecutors.directExecutor());
  1094. return Futures.transform(
  1095. response, transformStatusToResponseFunc::apply, MoreExecutors.directExecutor());
  1096. }
  1097. @Override
  1098. public Response flush(String collectionName) {
  1099. List<String> list =
  1100. new ArrayList<String>() {
  1101. {
  1102. add(collectionName);
  1103. }
  1104. };
  1105. return flush(list);
  1106. }
  1107. @Override
  1108. public ListenableFuture<Response> flushAsync(String collectionName) {
  1109. List<String> list =
  1110. new ArrayList<String>() {
  1111. {
  1112. add(collectionName);
  1113. }
  1114. };
  1115. return flushAsync(list);
  1116. }
  1117. @Override
  1118. public Response compact(String collectionName) {
  1119. if (!channelIsReadyOrIdle()) {
  1120. logWarning("You are not connected to Milvus server");
  1121. return new Response(Response.Status.CLIENT_NOT_CONNECTED);
  1122. }
  1123. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  1124. Status response;
  1125. try {
  1126. response = blockingStub.compact(request);
  1127. if (response.getErrorCode() == ErrorCode.SUCCESS) {
  1128. logInfo("Compacted collection `{}` successfully!", collectionName);
  1129. return new Response(Response.Status.SUCCESS);
  1130. } else {
  1131. logError("Compact collection `{}` failed:\n{}", collectionName, response.toString());
  1132. return new Response(
  1133. Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
  1134. }
  1135. } catch (StatusRuntimeException e) {
  1136. logError("compact RPC failed:\n{}", e.getStatus().toString());
  1137. return new Response(Response.Status.RPC_ERROR, e.toString());
  1138. }
  1139. }
  1140. @Override
  1141. public ListenableFuture<Response> compactAsync(@Nonnull String collectionName) {
  1142. if (!channelIsReadyOrIdle()) {
  1143. logWarning("You are not connected to Milvus server");
  1144. return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
  1145. }
  1146. CollectionName request = CollectionName.newBuilder().setCollectionName(collectionName).build();
  1147. ListenableFuture<Status> response;
  1148. response = futureStub.compact(request);
  1149. Futures.addCallback(
  1150. response,
  1151. new FutureCallback<Status>() {
  1152. @Override
  1153. public void onSuccess(Status result) {
  1154. if (result.getErrorCode() == ErrorCode.SUCCESS) {
  1155. logInfo("Compacted collection `{}` successfully!", collectionName);
  1156. } else {
  1157. logError("Compact collection `{}` failed:\n{}", collectionName, result.toString());
  1158. }
  1159. }
  1160. @Override
  1161. public void onFailure(Throwable t) {
  1162. logError("CompactAsync failed:\n{}", t.getMessage());
  1163. }
  1164. },
  1165. MoreExecutors.directExecutor());
  1166. return Futures.transform(
  1167. response, transformStatusToResponseFunc::apply, MoreExecutors.directExecutor());
  1168. }
  1169. ///////////////////// Util Functions/////////////////////
  1170. Function<Status, Response> transformStatusToResponseFunc =
  1171. status -> {
  1172. if (status.getErrorCode() == ErrorCode.SUCCESS) {
  1173. return new Response(Response.Status.SUCCESS);
  1174. } else {
  1175. return new Response(
  1176. Response.Status.valueOf(status.getErrorCodeValue()), status.getReason());
  1177. }
  1178. };
  1179. private List<RowRecord> buildRowRecordList(
  1180. @Nonnull List<List<Float>> floatVectors, @Nonnull List<ByteBuffer> binaryVectors) {
  1181. List<RowRecord> rowRecordList = new ArrayList<>();
  1182. int largerSize = Math.max(floatVectors.size(), binaryVectors.size());
  1183. for (int i = 0; i < largerSize; ++i) {
  1184. RowRecord.Builder rowRecordBuilder = RowRecord.newBuilder();
  1185. if (i < floatVectors.size()) {
  1186. rowRecordBuilder.addAllFloatData(floatVectors.get(i));
  1187. }
  1188. if (i < binaryVectors.size()) {
  1189. ((Buffer) binaryVectors.get(i)).rewind();
  1190. rowRecordBuilder.setBinaryData(ByteString.copyFrom(binaryVectors.get(i)));
  1191. }
  1192. rowRecordList.add(rowRecordBuilder.build());
  1193. }
  1194. return rowRecordList;
  1195. }
  1196. private SearchResponse buildSearchResponse(TopKQueryResult topKQueryResult) {
  1197. final int numQueries = (int) topKQueryResult.getRowNum();
  1198. final int topK =
  1199. numQueries == 0
  1200. ? 0
  1201. : topKQueryResult.getIdsCount()
  1202. / numQueries; // Guaranteed to be divisible from server side
  1203. List<List<Long>> resultIdsList = new ArrayList<>();
  1204. List<List<Float>> resultDistancesList = new ArrayList<>();
  1205. if (topK > 0) {
  1206. for (int i = 0; i < numQueries; i++) {
  1207. // Process result of query i
  1208. int pos = i * topK;
  1209. while (pos < i * topK + topK && topKQueryResult.getIdsList().get(pos) != -1) {
  1210. pos++;
  1211. }
  1212. resultIdsList.add(topKQueryResult.getIdsList().subList(i * topK, pos));
  1213. resultDistancesList.add(topKQueryResult.getDistancesList().subList(i * topK, pos));
  1214. }
  1215. }
  1216. SearchResponse searchResponse = new SearchResponse();
  1217. searchResponse.setNumQueries(numQueries);
  1218. searchResponse.setTopK(topK);
  1219. searchResponse.setResultIdsList(resultIdsList);
  1220. searchResponse.setResultDistancesList(resultDistancesList);
  1221. return searchResponse;
  1222. }
  1223. private boolean channelIsReadyOrIdle() {
  1224. if (channel == null) {
  1225. return false;
  1226. }
  1227. ConnectivityState connectivityState = channel.getState(false);
  1228. return connectivityState == ConnectivityState.READY
  1229. || connectivityState
  1230. == ConnectivityState.IDLE; // Since a new RPC would take the channel out of idle mode
  1231. }
  1232. ///////////////////// Log Functions//////////////////////
  1233. private void logInfo(String msg, Object... params) {
  1234. logger.info(msg, params);
  1235. }
  1236. private void logWarning(String msg, Object... params) {
  1237. logger.warn(msg, params);
  1238. }
  1239. private void logError(String msg, Object... params) {
  1240. logger.error(msg, params);
  1241. }
  1242. }