AbstractMilvusGrpcClient.java 85 KB


  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package io.milvus.client;
  20. import com.google.protobuf.ByteString;
  21. import io.grpc.StatusRuntimeException;
  22. import io.milvus.exception.ClientNotConnectedException;
  23. import io.milvus.exception.IllegalResponseException;
  24. import io.milvus.exception.ParamException;
  25. import io.milvus.grpc.*;
  26. import io.milvus.param.Constant;
  27. import io.milvus.param.R;
  28. import io.milvus.param.RpcStatus;
  29. import io.milvus.param.alias.AlterAliasParam;
  30. import io.milvus.param.alias.CreateAliasParam;
  31. import io.milvus.param.alias.DropAliasParam;
  32. import io.milvus.param.collection.*;
  33. import io.milvus.param.control.*;
  34. import io.milvus.param.dml.*;
  35. import io.milvus.param.index.*;
  36. import io.milvus.param.partition.*;
  37. import lombok.NonNull;
  38. import org.apache.commons.collections4.CollectionUtils;
  39. import org.apache.commons.collections4.MapUtils;
  40. import org.slf4j.Logger;
  41. import org.slf4j.LoggerFactory;
  42. import java.nio.ByteBuffer;
  43. import java.nio.ByteOrder;
  44. import java.util.*;
  45. import java.util.concurrent.TimeUnit;
  46. import java.util.stream.Collectors;
  47. public abstract class AbstractMilvusGrpcClient implements MilvusClient {
  48. private static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
  49. protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
  50. protected abstract MilvusServiceGrpc.MilvusServiceFutureStub futureStub();
  51. protected abstract boolean clientIsReady();
  52. ///////////////////// Internal Functions//////////////////////
  53. private List<KeyValuePair> assembleKvPair(Map<String, String> sourceMap) {
  54. List<KeyValuePair> result = new ArrayList<>();
  55. if (MapUtils.isNotEmpty(sourceMap)) {
  56. sourceMap.forEach((key, value) -> {
  57. KeyValuePair kv = KeyValuePair.newBuilder()
  58. .setKey(key)
  59. .setValue(value).build();
  60. result.add(kv);
  61. });
  62. }
  63. return result;
  64. }
  65. @SuppressWarnings("unchecked")
  66. private FieldData genFieldData(String fieldName, DataType dataType, List<?> objects) {
  67. if (objects == null) {
  68. throw new ParamException("Cannot generate FieldData from null object");
  69. }
  70. FieldData.Builder builder = FieldData.newBuilder();
  71. if (vectorDataType.contains(dataType)) {
  72. if (dataType == DataType.FloatVector) {
  73. List<Float> floats = new ArrayList<>();
  74. // each object is List<Float>
  75. for (Object object : objects) {
  76. if (object instanceof List) {
  77. List<Float> list = (List<Float>) object;
  78. floats.addAll(list);
  79. } else {
  80. throw new ParamException("The type of FloatVector must be List<Float>");
  81. }
  82. }
  83. int dim = floats.size() / objects.size();
  84. FloatArray floatArray = FloatArray.newBuilder().addAllData(floats).build();
  85. VectorField vectorField = VectorField.newBuilder().setDim(dim).setFloatVector(floatArray).build();
  86. return builder.setFieldName(fieldName).setType(DataType.FloatVector).setVectors(vectorField).build();
  87. } else if (dataType == DataType.BinaryVector) {
  88. ByteBuffer totalBuf = null;
  89. int dim = 0;
  90. // each object is ByteBuffer
  91. for (Object object : objects) {
  92. ByteBuffer buf = (ByteBuffer) object;
  93. if (totalBuf == null){
  94. totalBuf = ByteBuffer.allocate(buf.position() * objects.size());
  95. totalBuf.put(buf.array());
  96. dim = buf.position() * 8;
  97. } else {
  98. totalBuf.put(buf.array());
  99. }
  100. }
  101. assert totalBuf != null;
  102. ByteString byteString = ByteString.copyFrom(totalBuf.array());
  103. VectorField vectorField = VectorField.newBuilder().setDim(dim).setBinaryVector(byteString).build();
  104. return builder.setFieldName(fieldName).setType(DataType.BinaryVector).setVectors(vectorField).build();
  105. }
  106. } else {
  107. switch (dataType) {
  108. case None:
  109. case UNRECOGNIZED:
  110. throw new ParamException("Cannot support this dataType:" + dataType);
  111. case Int64:
  112. List<Long> longs = objects.stream().map(p -> (Long) p).collect(Collectors.toList());
  113. LongArray longArray = LongArray.newBuilder().addAllData(longs).build();
  114. ScalarField scalarField1 = ScalarField.newBuilder().setLongData(longArray).build();
  115. return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField1).build();
  116. case Int32:
  117. case Int16:
  118. case Int8:
  119. List<Integer> integers = objects.stream().map(p -> p instanceof Short ? ((Short)p).intValue() :(Integer) p).collect(Collectors.toList());
  120. IntArray intArray = IntArray.newBuilder().addAllData(integers).build();
  121. ScalarField scalarField2 = ScalarField.newBuilder().setIntData(intArray).build();
  122. return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField2).build();
  123. case Bool:
  124. List<Boolean> booleans = objects.stream().map(p -> (Boolean) p).collect(Collectors.toList());
  125. BoolArray boolArray = BoolArray.newBuilder().addAllData(booleans).build();
  126. ScalarField scalarField3 = ScalarField.newBuilder().setBoolData(boolArray).build();
  127. return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField3).build();
  128. case Float:
  129. List<Float> floats = objects.stream().map(p -> (Float) p).collect(Collectors.toList());
  130. FloatArray floatArray = FloatArray.newBuilder().addAllData(floats).build();
  131. ScalarField scalarField4 = ScalarField.newBuilder().setFloatData(floatArray).build();
  132. return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField4).build();
  133. case Double:
  134. List<Double> doubles = objects.stream().map(p -> (Double) p).collect(Collectors.toList());
  135. DoubleArray doubleArray = DoubleArray.newBuilder().addAllData(doubles).build();
  136. ScalarField scalarField5 = ScalarField.newBuilder().setDoubleData(doubleArray).build();
  137. return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField5).build();
  138. case String:
  139. List<String> strings = objects.stream().map(p -> (String) p).collect(Collectors.toList());
  140. StringArray stringArray = StringArray.newBuilder().addAllData(strings).build();
  141. ScalarField scalarField6 = ScalarField.newBuilder().setStringData(stringArray).build();
  142. return builder.setFieldName(fieldName).setType(dataType).setScalars(scalarField6).build();
  143. }
  144. }
  145. return null;
  146. }
  147. private static final Set<DataType> vectorDataType = new HashSet<DataType>() {{
  148. add(DataType.FloatVector);
  149. add(DataType.BinaryVector);
  150. }};
  151. private void waitForLoadingCollection(String collectionName, List<String> partitionNames,
  152. long waitingInterval, long timeout) throws IllegalResponseException {
  153. long tsBegin = System.currentTimeMillis();
  154. if (partitionNames == null || partitionNames.isEmpty()) {
  155. ShowCollectionsRequest showCollectionRequest = ShowCollectionsRequest.newBuilder()
  156. .addCollectionNames(collectionName)
  157. .setType(ShowType.InMemory)
  158. .build();
  159. // Use showCollection() to check loading percentages of the collection.
  160. // If the inMemory percentage is 100, that means the collection has finished loading.
  161. // Otherwise, this thread will sleep a small interval and check again.
  162. // If waiting time exceed timeout, exist the circle
  163. while (true) {
  164. long tsNow = System.currentTimeMillis();
  165. if ((tsNow - tsBegin) >= timeout*1000) {
  166. logWarning("Waiting load thread is timeout, loading process may not be finished");
  167. break;
  168. }
  169. ShowCollectionsResponse response = blockingStub().showCollections(showCollectionRequest);
  170. int namesCount = response.getCollectionNamesCount();
  171. int percentagesCount = response.getInMemoryPercentagesCount();
  172. if (namesCount != 1) {
  173. throw new IllegalResponseException("ShowCollectionsResponse is illegal. Collection count: "
  174. + namesCount);
  175. }
  176. if (namesCount != percentagesCount) {
  177. String msg = "ShowCollectionsResponse is illegal. Collection count: " + namesCount
  178. + " memory percentages count: " + percentagesCount;
  179. throw new IllegalResponseException(msg);
  180. }
  181. long percentage = response.getInMemoryPercentages(0);
  182. String responseCollection = response.getCollectionNames(0);
  183. if (responseCollection.compareTo(collectionName) == 0 && percentage >= 100) {
  184. break;
  185. }
  186. try {
  187. logInfo("Waiting load, interval: {} ms, percentage: {}%", waitingInterval, percentage);
  188. TimeUnit.MILLISECONDS.sleep(waitingInterval);
  189. } catch (InterruptedException e) {
  190. logWarning("Waiting load thread is interrupted, loading process may not be finished");
  191. break;
  192. }
  193. }
  194. } else {
  195. ShowPartitionsRequest showPartitionsRequest = ShowPartitionsRequest.newBuilder()
  196. .setCollectionName(collectionName)
  197. .addAllPartitionNames(partitionNames)
  198. .setType(ShowType.InMemory).build();
  199. // Use showPartitions() to check loading percentages of all the partitions.
  200. // If each partition's inMemory percentage is 100, that means all the partitions have finished loading.
  201. // Otherwise, this thread will sleep a small interval and check again.
  202. // If waiting time exceed timeout, exist the circle
  203. while(true) {
  204. long tsNow = System.currentTimeMillis();
  205. if ((tsNow - tsBegin) >= timeout*1000) {
  206. logWarning("Waiting load thread is timeout, loading process may not be finished");
  207. break;
  208. }
  209. ShowPartitionsResponse response = blockingStub().showPartitions(showPartitionsRequest);
  210. int namesCount = response.getPartitionNamesCount();
  211. int percentagesCount = response.getInMemoryPercentagesCount();
  212. if (namesCount != percentagesCount) {
  213. String msg = "ShowPartitionsResponse is illegal. Partition count: " + namesCount
  214. + " memory percentages count: " + percentagesCount;
  215. throw new IllegalResponseException(msg);
  216. }
  217. // construct a hash map to check each partition's inMemory percentage by name
  218. Map<String, Long> percentages = new HashMap<>();
  219. for (int i = 0; i < response.getInMemoryPercentagesCount(); ++i) {
  220. percentages.put(response.getPartitionNames(i), response.getInMemoryPercentages(i));
  221. }
  222. String partitionNoMemState = "";
  223. String partitionNotFullyLoad = "";
  224. boolean allLoaded = true;
  225. for (String name : partitionNames) {
  226. if (!percentages.containsKey(name)) {
  227. allLoaded = false;
  228. partitionNoMemState = name;
  229. break;
  230. }
  231. if (percentages.get(name) < 100L) {
  232. allLoaded = false;
  233. partitionNotFullyLoad = name;
  234. break;
  235. }
  236. }
  237. if (allLoaded) {
  238. break;
  239. }
  240. try {
  241. String msg = "Waiting load, interval: " + waitingInterval + "ms.";
  242. if (!partitionNoMemState.isEmpty()) {
  243. msg += ("Partition " + partitionNoMemState + " has no memory state.");
  244. }
  245. if (!partitionNotFullyLoad.isEmpty()) {
  246. msg += ("Partition " + partitionNotFullyLoad + " has not fully loaded.");
  247. }
  248. logInfo(msg);
  249. TimeUnit.MILLISECONDS.sleep(waitingInterval);
  250. } catch (InterruptedException e) {
  251. logWarning("Waiting load thread is interrupted, load process may not be finished");
  252. break;
  253. }
  254. }
  255. }
  256. }
  257. private void waitForFlush(FlushResponse flushResponse, long waitingInterval, long timeout) {
  258. // The rpc api flush() return FlushResponse, but the returned segment ids maybe not yet persisted.
  259. // This method use getPersistentSegmentInfo() to check segment state.
  260. // If all segments state become Flushed, then we say the sync flush action is finished.
  261. // If waiting time exceed timeout, exist the circle
  262. long tsBegin = System.currentTimeMillis();
  263. Map<String, LongArray> collectionSegIDs = flushResponse.getCollSegIDsMap();
  264. collectionSegIDs.forEach((collectionName, segmentIDs) -> {
  265. while (segmentIDs.getDataCount() > 0) {
  266. long tsNow = System.currentTimeMillis();
  267. if ((tsNow - tsBegin) >= timeout*1000) {
  268. logWarning("Waiting flush thread is timeout, flush process may not be finished");
  269. break;
  270. }
  271. GetPersistentSegmentInfoRequest getSegInfoRequest = GetPersistentSegmentInfoRequest.newBuilder()
  272. .setCollectionName(collectionName)
  273. .build();
  274. GetPersistentSegmentInfoResponse response = blockingStub().getPersistentSegmentInfo(getSegInfoRequest);
  275. List<PersistentSegmentInfo> segmentInfoArray = response.getInfosList();
  276. int flushedCount = 0;
  277. for (int i = 0; i < segmentIDs.getDataCount(); ++i) {
  278. for (PersistentSegmentInfo info : segmentInfoArray) {
  279. if (info.getSegmentID() == segmentIDs.getData(i) && info.getState() == SegmentState.Flushed) {
  280. flushedCount++;
  281. break;
  282. }
  283. }
  284. }
  285. // if all segment of this collection has been flushed, break this circle and check next collection
  286. if (flushedCount == segmentIDs.getDataCount()) {
  287. break;
  288. }
  289. try {
  290. String msg = "Waiting flush, interval: " + waitingInterval + "ms. " + flushedCount +
  291. " of " + segmentIDs.getDataCount() + " segments flushed.";
  292. logInfo(msg);
  293. TimeUnit.MILLISECONDS.sleep(waitingInterval);
  294. } catch (InterruptedException e) {
  295. logWarning("Waiting flush thread is interrupted, flush process may not be finished");
  296. break;
  297. }
  298. }
  299. });
  300. }
  301. private R<Boolean> waitForIndex(String collectionName, String fieldName, long waitingInterval, long timeout) {
  302. // This method use getIndexState() to check index state.
  303. // If all index state become Finished, then we say the sync index action is finished.
  304. // If waiting time exceed timeout, exist the circle
  305. long tsBegin = System.currentTimeMillis();
  306. while (true) {
  307. long tsNow = System.currentTimeMillis();
  308. if ((tsNow - tsBegin) >= timeout*1000) {
  309. String msg = "Waiting index thread is timeout, index process may not be finished";
  310. logWarning(msg);
  311. return R.failed(R.Status.Success, msg);
  312. }
  313. GetIndexStateRequest request = GetIndexStateRequest.newBuilder()
  314. .setCollectionName(collectionName)
  315. .setFieldName(fieldName)
  316. .build();
  317. GetIndexStateResponse response = blockingStub().getIndexState(request);
  318. if (response.getState() == IndexState.Finished) {
  319. break;
  320. } else if (response.getState() == IndexState.Failed) {
  321. String msg = "Index failed: " + response.getFailReason();
  322. logError(msg);
  323. return R.failed(R.Status.UnexpectedError, msg);
  324. }
  325. try {
  326. String msg = "Waiting index, interval: " + waitingInterval + "ms. ";
  327. logInfo(msg);
  328. TimeUnit.MILLISECONDS.sleep(waitingInterval);
  329. } catch (InterruptedException e) {
  330. String msg = "Waiting index thread is interrupted, index process may not be finished";
  331. logWarning(msg);
  332. return R.failed(R.Status.Success, msg);
  333. }
  334. }
  335. return R.failed(R.Status.Success, "Waiting index thread exist");
  336. }
  337. ///////////////////// API implementation //////////////////////
  338. @Override
  339. public R<Boolean> hasCollection(@NonNull HasCollectionParam requestParam) {
  340. if (!clientIsReady()) {
  341. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  342. }
  343. logInfo(requestParam.toString());
  344. try {
  345. HasCollectionRequest hasCollectionRequest = HasCollectionRequest.newBuilder()
  346. .setCollectionName(requestParam.getCollectionName())
  347. .build();
  348. BoolResponse response = blockingStub().hasCollection(hasCollectionRequest);
  349. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  350. logInfo("HasCollectionRequest successfully!");
  351. Boolean value = Optional.of(response)
  352. .map(BoolResponse::getValue)
  353. .orElse(false);
  354. return R.success(value);
  355. } else {
  356. logError("HasCollectionRequest failed!\n{}", response.getStatus().getReason());
  357. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  358. response.getStatus().getReason());
  359. }
  360. } catch (StatusRuntimeException e) {
  361. logError("HasCollectionRequest RPC failed:\n{}", e.getStatus().toString());
  362. return R.failed(e);
  363. } catch (Exception e) {
  364. logError("HasCollectionRequest failed:\n{}", e.getMessage());
  365. return R.failed(e);
  366. }
  367. }
  368. @Override
  369. public R<RpcStatus> createCollection(@NonNull CreateCollectionParam requestParam) {
  370. if (!clientIsReady()) {
  371. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  372. }
  373. logInfo(requestParam.toString());
  374. try {
  375. // Construct CollectionSchema Params
  376. CollectionSchema.Builder collectionSchemaBuilder = CollectionSchema.newBuilder();
  377. collectionSchemaBuilder.setName(requestParam.getCollectionName())
  378. .setDescription(requestParam.getDescription());
  379. long fieldID = 0;
  380. for (FieldType fieldType : requestParam.getFieldTypes()) {
  381. FieldSchema.Builder fieldSchemaBuilder = FieldSchema.newBuilder()
  382. .setFieldID(fieldID)
  383. .setName(fieldType.getName())
  384. .setIsPrimaryKey(fieldType.isPrimaryKey())
  385. .setDescription(fieldType.getDescription())
  386. .setDataType(fieldType.getDataType())
  387. .setAutoID(fieldType.isAutoID());
  388. // assemble typeParams for CollectionSchema
  389. List<KeyValuePair> typeParamsList = assembleKvPair(fieldType.getTypeParams());
  390. if (CollectionUtils.isNotEmpty(typeParamsList)) {
  391. typeParamsList.forEach(fieldSchemaBuilder::addTypeParams);
  392. }
  393. collectionSchemaBuilder.addFields(fieldSchemaBuilder.build());
  394. fieldID++;
  395. }
  396. // Construct CreateCollectionRequest
  397. CreateCollectionRequest createCollectionRequest = CreateCollectionRequest.newBuilder()
  398. .setCollectionName(requestParam.getCollectionName())
  399. .setShardsNum(requestParam.getShardsNum())
  400. .setSchema(collectionSchemaBuilder.build().toByteString())
  401. .build();
  402. Status response = blockingStub().createCollection(createCollectionRequest);
  403. if (response.getErrorCode() == ErrorCode.Success) {
  404. logInfo("CreateCollectionRequest successfully! Collection name:{}",
  405. requestParam.getCollectionName());
  406. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  407. } else {
  408. logError("CreateCollectionRequest failed!\n{}", response.getReason());
  409. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  410. }
  411. } catch (StatusRuntimeException e) {
  412. logError("CreateCollectionRequest RPC failed! Collection name:{}\n{}",
  413. requestParam.getCollectionName(), e.getStatus().toString());
  414. return R.failed(e);
  415. } catch (Exception e) {
  416. logError("CreateCollectionRequest failed! Collection name:{}\n{}",
  417. requestParam.getCollectionName(), e.getMessage());
  418. return R.failed(e);
  419. }
  420. }
  421. @Override
  422. public R<RpcStatus> dropCollection(@NonNull DropCollectionParam requestParam) {
  423. if (!clientIsReady()) {
  424. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  425. }
  426. logInfo(requestParam.toString());
  427. try {
  428. DropCollectionRequest dropCollectionRequest = DropCollectionRequest.newBuilder()
  429. .setCollectionName(requestParam.getCollectionName())
  430. .build();
  431. Status response = blockingStub().dropCollection(dropCollectionRequest);
  432. if (response.getErrorCode() == ErrorCode.Success) {
  433. logInfo("DropCollectionRequest successfully! Collection name:{}",
  434. requestParam.getCollectionName());
  435. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  436. } else {
  437. logError("DropCollectionRequest failed! Collection name:{}\n{}",
  438. requestParam.getCollectionName(), response.getReason());
  439. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  440. }
  441. } catch (StatusRuntimeException e) {
  442. logError("DropCollectionRequest RPC failed! Collection name:{}\n{}",
  443. requestParam.getCollectionName(), e.getStatus().toString());
  444. return R.failed(e);
  445. } catch (Exception e) {
  446. logError("DropCollectionRequest failed! Collection name:{}\n{}",
  447. requestParam.getCollectionName(), e.getMessage());
  448. return R.failed(e);
  449. }
  450. }
  451. @Override
  452. public R<RpcStatus> loadCollection(@NonNull LoadCollectionParam requestParam) {
  453. if (!clientIsReady()) {
  454. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  455. }
  456. logInfo(requestParam.toString());
  457. try {
  458. LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
  459. .setCollectionName(requestParam.getCollectionName())
  460. .build();
  461. Status response = blockingStub().loadCollection(loadCollectionRequest);
  462. if (response.getErrorCode() != ErrorCode.Success) {
  463. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  464. }
  465. // sync load, wait until collection finish loading
  466. if (requestParam.isSyncLoad()) {
  467. waitForLoadingCollection(requestParam.getCollectionName(), null,
  468. requestParam.getSyncLoadWaitingInterval(), requestParam.getSyncLoadWaitingTimeout());
  469. }
  470. logInfo("LoadCollectionRequest successfully! Collection name:{}",
  471. requestParam.getCollectionName());
  472. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  473. } catch (StatusRuntimeException e) { // gRPC could throw this exception
  474. logError("LoadCollectionRequest RPC failed! Collection name:{}\n{}",
  475. requestParam.getCollectionName(), e.getStatus().toString());
  476. return R.failed(e);
  477. } catch (IllegalResponseException e) { // milvus exception for illegal response
  478. logError("LoadCollectionRequest failed! Collection name:{}\n{}",
  479. requestParam.getCollectionName(), e.getStatus().toString());
  480. return R.failed(e);
  481. } catch (Exception e) {
  482. logError("LoadCollectionRequest failed! Collection name:{}\n{}",
  483. requestParam.getCollectionName(), e.getMessage());
  484. return R.failed(e);
  485. }
  486. }
  487. @Override
  488. public R<RpcStatus> releaseCollection(@NonNull ReleaseCollectionParam requestParam) {
  489. if (!clientIsReady()) {
  490. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  491. }
  492. logInfo(requestParam.toString());
  493. try {
  494. ReleaseCollectionRequest releaseCollectionRequest = ReleaseCollectionRequest.newBuilder()
  495. .setCollectionName(requestParam.getCollectionName())
  496. .build();
  497. Status response = blockingStub().releaseCollection(releaseCollectionRequest);
  498. if (response.getErrorCode() == ErrorCode.Success) {
  499. logInfo("ReleaseCollectionRequest successfully! Collection name:{}",
  500. requestParam.getCollectionName());
  501. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  502. } else {
  503. logError("ReleaseCollectionRequest failed! Collection name:{}\n{}",
  504. requestParam.getCollectionName(), response.getReason());
  505. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  506. }
  507. } catch (StatusRuntimeException e) {
  508. logError("ReleaseCollectionRequest RPC failed! Collection name:{}\n{}",
  509. requestParam.getCollectionName(), e.getStatus().toString());
  510. return R.failed(e);
  511. } catch (Exception e) {
  512. logError("ReleaseCollectionRequest failed! Collection name:{}\n{}",
  513. requestParam.getCollectionName(), e.getMessage());
  514. return R.failed(e);
  515. }
  516. }
  517. @Override
  518. public R<DescribeCollectionResponse> describeCollection(@NonNull DescribeCollectionParam requestParam) {
  519. if (!clientIsReady()) {
  520. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  521. }
  522. logInfo(requestParam.toString());
  523. try {
  524. DescribeCollectionRequest describeCollectionRequest = DescribeCollectionRequest.newBuilder()
  525. .setCollectionName(requestParam.getCollectionName())
  526. .build();
  527. DescribeCollectionResponse response = blockingStub().describeCollection(describeCollectionRequest);
  528. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  529. logInfo("DescribeCollectionRequest successfully!");
  530. return R.success(response);
  531. } else {
  532. logError("DescribeCollectionRequest failed!\n{}", response.getStatus().getReason());
  533. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  534. response.getStatus().getReason());
  535. }
  536. } catch (StatusRuntimeException e) {
  537. logError("DescribeCollectionRequest RPC failed:\n{}", e.getStatus().toString());
  538. return R.failed(e);
  539. } catch (Exception e) {
  540. logError("DescribeCollectionRequest failed:\n{}", e.getMessage());
  541. return R.failed(e);
  542. }
  543. }
  544. @Override
  545. public R<GetCollectionStatisticsResponse> getCollectionStatistics(@NonNull GetCollectionStatisticsParam requestParam) {
  546. if (!clientIsReady()) {
  547. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  548. }
  549. logInfo(requestParam.toString());
  550. try {
  551. // flush collection if client command to do it(some times user may want to know the newest row count)
  552. if (requestParam.isFlushCollection()) {
  553. R<FlushResponse> response = flush(FlushParam.newBuilder()
  554. .addCollectionName(requestParam.getCollectionName())
  555. .withSyncFlush(Boolean.TRUE)
  556. .build());
  557. if (response.getStatus() != R.Status.Success.getCode()) {
  558. return R.failed(R.Status.valueOf(response.getStatus()), response.getMessage());
  559. }
  560. }
  561. GetCollectionStatisticsRequest getCollectionStatisticsRequest = GetCollectionStatisticsRequest.newBuilder()
  562. .setCollectionName(requestParam.getCollectionName())
  563. .build();
  564. GetCollectionStatisticsResponse response = blockingStub().getCollectionStatistics(getCollectionStatisticsRequest);
  565. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  566. logInfo("GetCollectionStatisticsRequest successfully!");
  567. return R.success(response);
  568. } else {
  569. logError("GetCollectionStatisticsRequest failed!\n{}", response.getStatus().getReason());
  570. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  571. response.getStatus().getReason());
  572. }
  573. } catch (StatusRuntimeException e) {
  574. logError("GetCollectionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
  575. return R.failed(e);
  576. } catch (Exception e) {
  577. logError("GetCollectionStatisticsRequest failed:\n{}", e.getMessage());
  578. return R.failed(e);
  579. }
  580. }
  581. @Override
  582. public R<ShowCollectionsResponse> showCollections(@NonNull ShowCollectionsParam requestParam) {
  583. if (!clientIsReady()) {
  584. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  585. }
  586. logInfo(requestParam.toString());
  587. try {
  588. ShowCollectionsRequest showCollectionsRequest = ShowCollectionsRequest.newBuilder()
  589. .addAllCollectionNames(requestParam.getCollectionNames())
  590. .setType(requestParam.getShowType()).build();
  591. ShowCollectionsResponse response = blockingStub().showCollections(showCollectionsRequest);
  592. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  593. logInfo("ShowCollectionsRequest successfully!");
  594. return R.success(response);
  595. } else {
  596. logError("ShowCollectionsRequest failed!\n{}", response.getStatus().getReason());
  597. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  598. response.getStatus().getReason());
  599. }
  600. } catch (StatusRuntimeException e) {
  601. logError("ShowCollectionsRequest RPC failed:\n{}", e.getStatus().toString());
  602. return R.failed(e);
  603. } catch (Exception e) {
  604. logError("ShowCollectionsRequest failed:\n{}", e.getMessage());
  605. return R.failed(e);
  606. }
  607. }
  608. /**
  609. * Currently we do not support this method on client since compaction is not supported on server.
  610. * Now it is only for internal use of getCollectionStatistics().
  611. */
  612. // @Override
  613. private R<FlushResponse> flush(@NonNull FlushParam requestParam) {
  614. if (!clientIsReady()) {
  615. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  616. }
  617. logInfo(requestParam.toString());
  618. try {
  619. MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Flush).build();
  620. FlushRequest flushRequest = FlushRequest.newBuilder()
  621. .setBase(msgBase)
  622. .addAllCollectionNames(requestParam.getCollectionNames())
  623. .build();
  624. FlushResponse response = blockingStub().flush(flushRequest);
  625. if (requestParam.getSyncFlush() == Boolean.TRUE) {
  626. waitForFlush(response, requestParam.getSyncFlushWaitingInterval(),
  627. requestParam.getSyncFlushWaitingTimeout());
  628. }
  629. logInfo("FlushRequest successfully! Collection names:{}", requestParam.getCollectionNames());
  630. return R.success(response);
  631. } catch (StatusRuntimeException e) {
  632. logError("FlushRequest RPC failed! Collection names:{}\n{}",
  633. requestParam.getCollectionNames(), e.getStatus().toString());
  634. return R.failed(e);
  635. } catch (Exception e) {
  636. logError("FlushRequest failed! Collection names:{}\n{}",
  637. requestParam.getCollectionNames(), e.getMessage());
  638. return R.failed(e);
  639. }
  640. }
  641. @Override
  642. public R<RpcStatus> createPartition(@NonNull CreatePartitionParam requestParam) {
  643. if (!clientIsReady()) {
  644. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  645. }
  646. logInfo(requestParam.toString());
  647. try {
  648. CreatePartitionRequest createPartitionRequest = CreatePartitionRequest.newBuilder()
  649. .setCollectionName(requestParam.getCollectionName())
  650. .setPartitionName(requestParam.getPartitionName())
  651. .build();
  652. Status response = blockingStub().createPartition(createPartitionRequest);
  653. if (response.getErrorCode() == ErrorCode.Success) {
  654. logInfo("CreatePartitionRequest successfully! Collection name:{}, partition name:{}",
  655. requestParam.getCollectionName(), requestParam.getPartitionName());
  656. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  657. } else {
  658. logError("CreatePartitionRequest failed! Collection name:{}, partition name:{}\n{}",
  659. requestParam.getCollectionName(), requestParam.getPartitionName(), response.getReason());
  660. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  661. }
  662. } catch (StatusRuntimeException e) {
  663. logError("CreatePartitionRequest RPC failed! Collection name:{}, partition name:{}\n{}",
  664. requestParam.getCollectionName(), requestParam.getPartitionName(), e.getStatus().toString());
  665. return R.failed(e);
  666. } catch (Exception e) {
  667. logError("CreatePartitionRequest failed! Collection name:{}, partition name:{}\n{}",
  668. requestParam.getCollectionName(), requestParam.getPartitionName(), e.getMessage());
  669. return R.failed(e);
  670. }
  671. }
  672. @Override
  673. public R<RpcStatus> dropPartition(@NonNull DropPartitionParam requestParam) {
  674. if (!clientIsReady()) {
  675. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  676. }
  677. logInfo(requestParam.toString());
  678. try {
  679. DropPartitionRequest dropPartitionRequest = DropPartitionRequest.newBuilder()
  680. .setCollectionName(requestParam.getCollectionName())
  681. .setPartitionName(requestParam.getPartitionName())
  682. .build();
  683. Status response = blockingStub().dropPartition(dropPartitionRequest);
  684. if (response.getErrorCode() == ErrorCode.Success) {
  685. logInfo("DropPartitionRequest successfully! Collection name:{}, partition name:{}",
  686. requestParam.getCollectionName(), requestParam.getPartitionName());
  687. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  688. } else {
  689. logError("DropPartitionRequest failed! Collection name:{}, partition name:{}\n{}",
  690. requestParam.getCollectionName(), requestParam.getPartitionName(), response.getReason());
  691. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  692. }
  693. } catch (StatusRuntimeException e) {
  694. logError("DropPartitionRequest RPC failed! Collection name:{}, partition name:{}\n{}",
  695. requestParam.getCollectionName(), requestParam.getPartitionName(), e.getStatus().toString());
  696. return R.failed(e);
  697. } catch (Exception e) {
  698. logError("DropPartitionRequest failed! Collection name:{}, partition name:{}\n{}",
  699. requestParam.getCollectionName(), requestParam.getPartitionName(), e.getMessage());
  700. return R.failed(e);
  701. }
  702. }
  703. @Override
  704. public R<Boolean> hasPartition(@NonNull HasPartitionParam requestParam) {
  705. if (!clientIsReady()) {
  706. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  707. }
  708. logInfo(requestParam.toString());
  709. try {
  710. HasPartitionRequest hasPartitionRequest = HasPartitionRequest.newBuilder()
  711. .setCollectionName(requestParam.getCollectionName())
  712. .setPartitionName(requestParam.getPartitionName())
  713. .build();
  714. BoolResponse response = blockingStub().hasPartition(hasPartitionRequest);
  715. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  716. logInfo("HasPartitionRequest successfully!");
  717. Boolean result = response.getValue();
  718. return R.success(result);
  719. } else {
  720. logError("HasPartitionRequest failed!\n{}", response.getStatus().getReason());
  721. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  722. response.getStatus().getReason());
  723. }
  724. } catch (StatusRuntimeException e) {
  725. logError("HasPartitionRequest RPC failed:\n{}", e.getStatus().toString());
  726. return R.failed(e);
  727. } catch (Exception e) {
  728. logError("HasPartitionRequest failed:\n{}", e.getMessage());
  729. return R.failed(e);
  730. }
  731. }
  732. @Override
  733. public R<RpcStatus> loadPartitions(@NonNull LoadPartitionsParam requestParam) {
  734. if (!clientIsReady()) {
  735. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  736. }
  737. logInfo(requestParam.toString());
  738. try {
  739. LoadPartitionsRequest loadPartitionsRequest = LoadPartitionsRequest.newBuilder()
  740. .setCollectionName(requestParam.getCollectionName())
  741. .addAllPartitionNames(requestParam.getPartitionNames())
  742. .build();
  743. Status response = blockingStub().loadPartitions(loadPartitionsRequest);
  744. if (response.getErrorCode() != ErrorCode.Success) {
  745. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  746. }
  747. // sync load, wait until all partitions finish loading
  748. if (requestParam.isSyncLoad()) {
  749. waitForLoadingCollection(requestParam.getCollectionName(), requestParam.getPartitionNames(),
  750. requestParam.getSyncLoadWaitingInterval(), requestParam.getSyncLoadWaitingTimeout());
  751. }
  752. logInfo("LoadPartitionsRequest successfully! Collection name:{}, partition names:{}",
  753. requestParam.getCollectionName(), requestParam.getPartitionNames());
  754. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  755. } catch (StatusRuntimeException e) { // gRPC could throw this exception
  756. logError("LoadPartitionsRequest RPC failed! Collection name:{}, partition names:{}\n{}",
  757. requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getStatus().toString());
  758. return R.failed(e);
  759. } catch (IllegalResponseException e) { // milvus exception for illegal response
  760. logError("LoadPartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
  761. requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getStatus().toString());
  762. return R.failed(e);
  763. } catch (Exception e) {
  764. logError("LoadPartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
  765. requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getMessage());
  766. return R.failed(e);
  767. }
  768. }
  769. @Override
  770. public R<RpcStatus> releasePartitions(@NonNull ReleasePartitionsParam requestParam) {
  771. if (!clientIsReady()) {
  772. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  773. }
  774. logInfo(requestParam.toString());
  775. try {
  776. ReleasePartitionsRequest releasePartitionsRequest = ReleasePartitionsRequest.newBuilder()
  777. .setCollectionName(requestParam.getCollectionName())
  778. .addAllPartitionNames(requestParam.getPartitionNames())
  779. .build();
  780. Status response = blockingStub().releasePartitions(releasePartitionsRequest);
  781. if (response.getErrorCode() == ErrorCode.Success) {
  782. logInfo("ReleasePartitionsRequest successfully! Collection name:{}, partition names:{}",
  783. requestParam.getCollectionName(), requestParam.getPartitionNames());
  784. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  785. } else {
  786. logError("ReleasePartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
  787. requestParam.getCollectionName(), requestParam.getPartitionNames(), response.getReason());
  788. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  789. }
  790. } catch (StatusRuntimeException e) {
  791. logError("ReleasePartitionsRequest RPC failed! Collection name:{}, partition names:{}\n{}",
  792. requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getStatus().toString());
  793. return R.failed(e);
  794. } catch (Exception e) {
  795. logError("ReleasePartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
  796. requestParam.getCollectionName(), requestParam.getPartitionNames(), e.getMessage());
  797. return R.failed(e);
  798. }
  799. }
  800. @Override
  801. public R<GetPartitionStatisticsResponse> getPartitionStatistics(@NonNull GetPartitionStatisticsParam requestParam) {
  802. if (!clientIsReady()) {
  803. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  804. }
  805. logInfo(requestParam.toString());
  806. try {
  807. // flush collection if client command to do it(some times user may want to know the newest row count)
  808. if (requestParam.isFlushCollection()) {
  809. R<FlushResponse> response = flush(FlushParam.newBuilder()
  810. .addCollectionName(requestParam.getCollectionName())
  811. .withSyncFlush(Boolean.TRUE)
  812. .build());
  813. if (response.getStatus() != R.Status.Success.getCode()) {
  814. return R.failed(R.Status.valueOf(response.getStatus()), response.getMessage());
  815. }
  816. }
  817. GetPartitionStatisticsRequest getPartitionStatisticsRequest = GetPartitionStatisticsRequest.newBuilder()
  818. .setCollectionName(requestParam.getCollectionName())
  819. .setPartitionName(requestParam.getPartitionName())
  820. .build();
  821. GetPartitionStatisticsResponse response =
  822. blockingStub().getPartitionStatistics(getPartitionStatisticsRequest);
  823. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  824. logInfo("GetPartitionStatisticsRequest successfully!");
  825. return R.success(response);
  826. } else {
  827. logError("ReleasePartitionsRequest failed:\n{}", response.getStatus().getReason());
  828. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  829. response.getStatus().getReason());
  830. }
  831. } catch (StatusRuntimeException e) {
  832. logError("GetPartitionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
  833. return R.failed(e);
  834. } catch (Exception e) {
  835. logError("GetQuerySegmentInfoRequest failed:\n{}", e.getMessage());
  836. return R.failed(e);
  837. }
  838. }
  839. @Override
  840. public R<ShowPartitionsResponse> showPartitions(@NonNull ShowPartitionsParam requestParam) {
  841. if (!clientIsReady()) {
  842. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  843. }
  844. logInfo(requestParam.toString());
  845. try {
  846. ShowPartitionsRequest showPartitionsRequest = ShowPartitionsRequest.newBuilder()
  847. .setCollectionName(requestParam.getCollectionName())
  848. .addAllPartitionNames(requestParam.getPartitionNames())
  849. .build();
  850. ShowPartitionsResponse response = blockingStub().showPartitions(showPartitionsRequest);
  851. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  852. logInfo("ShowPartitionsRequest successfully!");
  853. return R.success(response);
  854. } else {
  855. logError("ShowPartitionsRequest failed:\n{}", response.getStatus().getReason());
  856. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  857. response.getStatus().getReason());
  858. }
  859. } catch (StatusRuntimeException e) {
  860. logError("ShowPartitionsRequest RPC failed:\n{}", e.getStatus().toString());
  861. return R.failed(e);
  862. } catch (Exception e) {
  863. logError("ShowPartitionsRequest failed:\n{}", e.getMessage());
  864. return R.failed(e);
  865. }
  866. }
  867. @Override
  868. public R<RpcStatus> createAlias(@NonNull CreateAliasParam requestParam) {
  869. if (!clientIsReady()) {
  870. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  871. }
  872. logInfo(requestParam.toString());
  873. try {
  874. CreateAliasRequest createAliasRequest = CreateAliasRequest.newBuilder()
  875. .setCollectionName(requestParam.getCollectionName())
  876. .setAlias(requestParam.getAlias())
  877. .build();
  878. Status response = blockingStub().createAlias(createAliasRequest);
  879. if (response.getErrorCode() == ErrorCode.Success) {
  880. logInfo("CreateAliasRequest successfully! Collection name:{}, alias name:{}",
  881. requestParam.getCollectionName(), requestParam.getAlias());
  882. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  883. } else {
  884. logError("CreateAliasRequest failed! Collection name:{}, alias name:{}\n{}",
  885. requestParam.getCollectionName(), requestParam.getAlias(), response.getReason());
  886. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  887. }
  888. } catch (StatusRuntimeException e) {
  889. logError("CreateAliasRequest RPC failed! Collection name:{}, alias name:{}\n{}",
  890. requestParam.getCollectionName(), requestParam.getAlias(), e.getStatus().toString());
  891. return R.failed(e);
  892. } catch (Exception e) {
  893. logError("CreateAliasRequest failed! Collection name:{}, alias name:{}\n{}",
  894. requestParam.getCollectionName(), requestParam.getAlias(), e.getMessage());
  895. return R.failed(e);
  896. }
  897. }
  898. @Override
  899. public R<RpcStatus> dropAlias(@NonNull DropAliasParam requestParam) {
  900. if (!clientIsReady()) {
  901. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  902. }
  903. logInfo(requestParam.toString());
  904. try {
  905. DropAliasRequest dropAliasRequest = DropAliasRequest.newBuilder()
  906. .setAlias(requestParam.getAlias())
  907. .build();
  908. Status response = blockingStub().dropAlias(dropAliasRequest);
  909. if (response.getErrorCode() == ErrorCode.Success) {
  910. logInfo("DropAliasRequest successfully! Alias name:{}", requestParam.getAlias());
  911. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  912. } else {
  913. logError("DropAliasRequest failed! Alias name:{}\n{}",
  914. requestParam.getAlias(), response.getReason());
  915. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  916. }
  917. } catch (StatusRuntimeException e) {
  918. logError("DropAliasRequest RPC failed! Alias name:{}\n{}",
  919. requestParam.getAlias(), e.getStatus().toString());
  920. return R.failed(e);
  921. } catch (Exception e) {
  922. logError("DropAliasRequest failed! Alias name:{}\n{}",
  923. requestParam.getAlias(), e.getMessage());
  924. return R.failed(e);
  925. }
  926. }
  927. @Override
  928. public R<RpcStatus> alterAlias(@NonNull AlterAliasParam requestParam) {
  929. if (!clientIsReady()) {
  930. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  931. }
  932. logInfo(requestParam.toString());
  933. try {
  934. AlterAliasRequest alterAliasRequest = AlterAliasRequest.newBuilder()
  935. .setCollectionName(requestParam.getCollectionName())
  936. .setAlias(requestParam.getAlias())
  937. .build();
  938. Status response = blockingStub().alterAlias(alterAliasRequest);
  939. if (response.getErrorCode() == ErrorCode.Success) {
  940. logInfo("AlterAliasRequest successfully! Collection name:{}, alias name:{}",
  941. requestParam.getCollectionName(), requestParam.getAlias());
  942. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  943. } else {
  944. logError("AlterAliasRequest failed! Collection name:{}, alias name:{}\n{}",
  945. requestParam.getCollectionName(), requestParam.getAlias(), response.getReason());
  946. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  947. }
  948. } catch (StatusRuntimeException e) {
  949. logError("AlterAliasRequest RPC failed! Collection name:{}, alias name:{}\n{}",
  950. requestParam.getCollectionName(), requestParam.getAlias(), e.getStatus().toString());
  951. return R.failed(e);
  952. } catch (Exception e) {
  953. logError("AlterAliasRequest failed! Collection name:{}, alias name:{}\n{}",
  954. requestParam.getCollectionName(), requestParam.getAlias(), e.getMessage());
  955. return R.failed(e);
  956. }
  957. }
  958. @Override
  959. public R<RpcStatus> createIndex(@NonNull CreateIndexParam requestParam) {
  960. if (!clientIsReady()) {
  961. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  962. }
  963. logInfo(requestParam.toString());
  964. try {
  965. CreateIndexRequest.Builder createIndexRequestBuilder = CreateIndexRequest.newBuilder();
  966. List<KeyValuePair> extraParamList = assembleKvPair(requestParam.getExtraParam());
  967. if (CollectionUtils.isNotEmpty(extraParamList)) {
  968. extraParamList.forEach(createIndexRequestBuilder::addExtraParams);
  969. }
  970. CreateIndexRequest createIndexRequest = createIndexRequestBuilder.setCollectionName(requestParam.getCollectionName())
  971. .setFieldName(requestParam.getFieldName()).build();
  972. Status response = blockingStub().createIndex(createIndexRequest);
  973. if (response.getErrorCode() != ErrorCode.Success) {
  974. logError("CreateIndexRequest failed! Collection name:{} Field name:{}\n{}",
  975. requestParam.getCollectionName(), requestParam.getFieldName(), response.getReason());
  976. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  977. }
  978. if (requestParam.isSyncMode()) {
  979. R<Boolean> res = waitForIndex(requestParam.getCollectionName(), requestParam.getFieldName(),
  980. requestParam.getSyncWaitingInterval(), requestParam.getSyncWaitingTimeout());
  981. if (res.getStatus() != R.Status.Success.getCode()) {
  982. logError("CreateIndexRequest failed in sync mode! Collection name:{} Field name:{}\n{}",
  983. requestParam.getCollectionName(), requestParam.getFieldName(), response.getReason());
  984. return R.failed(R.Status.valueOf(res.getStatus()), res.getMessage());
  985. }
  986. }
  987. logInfo("CreateIndexRequest successfully! Collection name:{} Field name:{}",
  988. requestParam.getCollectionName(), requestParam.getFieldName());
  989. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  990. } catch (StatusRuntimeException e) {
  991. logError("CreateIndexRequest RPC failed! Collection name:{}\n{}",
  992. requestParam.getCollectionName(), e.getStatus().toString());
  993. return R.failed(e);
  994. } catch (Exception e) {
  995. logError("CreateIndexRequest failed! Collection name:{}\n{}",
  996. requestParam.getCollectionName(), e.getMessage());
  997. return R.failed(e);
  998. }
  999. }
  1000. @Override
  1001. public R<RpcStatus> dropIndex(@NonNull DropIndexParam requestParam) {
  1002. if (!clientIsReady()) {
  1003. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1004. }
  1005. logInfo(requestParam.toString());
  1006. try {
  1007. DropIndexRequest dropIndexRequest = DropIndexRequest.newBuilder()
  1008. .setCollectionName(requestParam.getCollectionName())
  1009. .setFieldName(requestParam.getFieldName())
  1010. .build();
  1011. Status response = blockingStub().dropIndex(dropIndexRequest);
  1012. if (response.getErrorCode() == ErrorCode.Success) {
  1013. logInfo("DropIndexRequest successfully! Collection name:{}",
  1014. requestParam.getCollectionName());
  1015. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  1016. } else {
  1017. logError("DropIndexRequest failed! Collection name:{}\n{}",
  1018. requestParam.getCollectionName(), response.getReason());
  1019. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  1020. }
  1021. } catch (StatusRuntimeException e) {
  1022. logError("DropIndexRequest RPC failed! Collection name:{}\n{}",
  1023. requestParam.getCollectionName(), e.getStatus().toString());
  1024. return R.failed(e);
  1025. } catch (Exception e) {
  1026. logError("DropIndexRequest failed! Collection name:{}\n{}",
  1027. requestParam.getCollectionName(), e.getMessage());
  1028. return R.failed(e);
  1029. }
  1030. }
  1031. @Override
  1032. public R<DescribeIndexResponse> describeIndex(@NonNull DescribeIndexParam requestParam) {
  1033. if (!clientIsReady()) {
  1034. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1035. }
  1036. logInfo(requestParam.toString());
  1037. try {
  1038. DescribeIndexRequest describeIndexRequest = DescribeIndexRequest.newBuilder()
  1039. .setCollectionName(requestParam.getCollectionName())
  1040. .setFieldName(requestParam.getFieldName())
  1041. .build();
  1042. DescribeIndexResponse response = blockingStub().describeIndex(describeIndexRequest);
  1043. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1044. logInfo("DescribeIndexRequest successfully!");
  1045. return R.success(response);
  1046. } else {
  1047. logError("DescribeIndexRequest failed:\n{}", response.getStatus().getReason());
  1048. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1049. response.getStatus().getReason());
  1050. }
  1051. } catch (StatusRuntimeException e) {
  1052. logError("DescribeIndexRequest RPC failed:\n{}", e.getStatus().toString());
  1053. return R.failed(e);
  1054. } catch (Exception e) {
  1055. logError("DescribeIndexRequest failed:\n{}", e.getMessage());
  1056. return R.failed(e);
  1057. }
  1058. }
  1059. @Override
  1060. public R<GetIndexStateResponse> getIndexState(@NonNull GetIndexStateParam requestParam) {
  1061. if (!clientIsReady()) {
  1062. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1063. }
  1064. logInfo(requestParam.toString());
  1065. try {
  1066. GetIndexStateRequest getIndexStateRequest = GetIndexStateRequest.newBuilder()
  1067. .setCollectionName(requestParam.getCollectionName())
  1068. .setFieldName(requestParam.getFieldName())
  1069. .build();
  1070. GetIndexStateResponse response = blockingStub().getIndexState(getIndexStateRequest);
  1071. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1072. logInfo("GetIndexStateRequest successfully!");
  1073. return R.success(response);
  1074. } else {
  1075. logError("GetIndexStateRequest failed:\n{}", response.getStatus().getReason());
  1076. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1077. response.getStatus().getReason());
  1078. }
  1079. } catch (StatusRuntimeException e) {
  1080. logError("GetIndexStateRequest RPC failed:\n{}", e.getStatus().toString());
  1081. return R.failed(e);
  1082. } catch (Exception e) {
  1083. logError("GetIndexStateRequest failed:\n{}", e.getMessage());
  1084. return R.failed(e);
  1085. }
  1086. }
  1087. @Override
  1088. public R<GetIndexBuildProgressResponse> getIndexBuildProgress(@NonNull GetIndexBuildProgressParam requestParam) {
  1089. if (!clientIsReady()) {
  1090. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1091. }
  1092. logInfo(requestParam.toString());
  1093. try {
  1094. GetIndexBuildProgressRequest getIndexBuildProgressRequest = GetIndexBuildProgressRequest.newBuilder()
  1095. .setCollectionName(requestParam.getCollectionName())
  1096. .build();
  1097. GetIndexBuildProgressResponse response = blockingStub().getIndexBuildProgress(getIndexBuildProgressRequest);
  1098. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1099. logInfo("GetIndexBuildProgressRequest successfully!");
  1100. return R.success(response);
  1101. } else {
  1102. logError("GetIndexBuildProgressRequest failed:\n{}", response.getStatus().getReason());
  1103. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1104. response.getStatus().getReason());
  1105. }
  1106. } catch (StatusRuntimeException e) {
  1107. logError("GetIndexBuildProgressRequest RPC failed:\n{}", e.getStatus().toString());
  1108. return R.failed(e);
  1109. } catch (Exception e) {
  1110. logError("GetIndexBuildProgressRequest failed:\n{}", e.getMessage());
  1111. return R.failed(e);
  1112. }
  1113. }
  1114. @Override
  1115. public R<MutationResult> delete(@NonNull DeleteParam requestParam) {
  1116. if (!clientIsReady()) {
  1117. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1118. }
  1119. logInfo(requestParam.toString());
  1120. try {
  1121. DeleteRequest deleteRequest = DeleteRequest.newBuilder()
  1122. .setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build())
  1123. .setCollectionName(requestParam.getCollectionName())
  1124. .setPartitionName(requestParam.getPartitionName())
  1125. .setExpr(requestParam.getExpr())
  1126. .build();
  1127. MutationResult response = blockingStub().delete(deleteRequest);
  1128. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1129. logInfo("DeleteRequest successfully! Collection name:{}",
  1130. requestParam.getCollectionName());
  1131. return R.success(response);
  1132. } else {
  1133. logError("DeleteRequest failed! Collection name:{}\n{}",
  1134. requestParam.getCollectionName(), response.getStatus().getReason());
  1135. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1136. response.getStatus().getReason());
  1137. }
  1138. } catch (StatusRuntimeException e) {
  1139. logError("DeleteRequest RPC failed! Collection name:{}\n{}",
  1140. requestParam.getCollectionName(), e.getMessage());
  1141. return R.failed(e);
  1142. } catch (Exception e) {
  1143. logError("DeleteRequest failed! Collection name:{}\n{}",
  1144. requestParam.getCollectionName(), e.getMessage());
  1145. return R.failed(e);
  1146. }
  1147. }
  1148. @Override
  1149. public R<MutationResult> insert(@NonNull InsertParam requestParam) {
  1150. if (!clientIsReady()) {
  1151. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1152. }
  1153. logInfo(requestParam.toString());
  1154. try {
  1155. String collectionName = requestParam.getCollectionName();
  1156. String partitionName = requestParam.getPartitionName();
  1157. List<InsertParam.Field> fields = requestParam.getFields();
  1158. //1. gen insert request
  1159. MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
  1160. InsertRequest.Builder insertBuilder = InsertRequest.newBuilder()
  1161. .setCollectionName(collectionName)
  1162. .setPartitionName(partitionName)
  1163. .setBase(msgBase)
  1164. .setNumRows(requestParam.getRowCount());
  1165. //2. gen fieldData
  1166. // TODO: check field type(use DescribeCollection get schema to compare)
  1167. for (InsertParam.Field field : fields) {
  1168. insertBuilder.addFieldsData(genFieldData(field.getName(), field.getType(), field.getValues()));
  1169. }
  1170. //3. call insert
  1171. InsertRequest insertRequest = insertBuilder.build();
  1172. MutationResult response = blockingStub().insert(insertRequest);
  1173. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1174. logInfo("InsertRequest successfully! Collection name:{}",
  1175. requestParam.getCollectionName());
  1176. return R.success(response);
  1177. } else {
  1178. logError("InsertRequest failed! Collection name:{}\n{}",
  1179. requestParam.getCollectionName(), response.getStatus().getReason());
  1180. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1181. response.getStatus().getReason());
  1182. }
  1183. } catch (StatusRuntimeException e) {
  1184. logError("InsertRequest RPC failed! Collection name:{}\n{}",
  1185. requestParam.getCollectionName(), e.getMessage());
  1186. return R.failed(e);
  1187. } catch (Exception e) {
  1188. logError("InsertRequest failed! Collection name:{}\n{}",
  1189. requestParam.getCollectionName(), e.getMessage());
  1190. return R.failed(e);
  1191. }
  1192. }
  1193. @Override
  1194. @SuppressWarnings("unchecked")
  1195. public R<SearchResults> search(@NonNull SearchParam requestParam) {
  1196. if (!clientIsReady()) {
  1197. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1198. }
  1199. logInfo(requestParam.toString());
  1200. try {
  1201. SearchRequest.Builder builder = SearchRequest.newBuilder()
  1202. .setDbName("")
  1203. .setCollectionName(requestParam.getCollectionName());
  1204. if (!requestParam.getPartitionNames().isEmpty()) {
  1205. requestParam.getPartitionNames().forEach(builder::addPartitionNames);
  1206. }
  1207. // prepare target vectors
  1208. // TODO: check target vector dimension(use DescribeCollection get schema to compare)
  1209. PlaceholderType plType = PlaceholderType.None;
  1210. List<?> vectors = requestParam.getVectors();
  1211. List<ByteString> byteStrings = new ArrayList<>();
  1212. for (Object vector : vectors) {
  1213. if (vector instanceof List) {
  1214. plType = PlaceholderType.FloatVector;
  1215. List<Float> list = (List<Float>) vector;
  1216. ByteBuffer buf = ByteBuffer.allocate(Float.BYTES * list.size());
  1217. buf.order(ByteOrder.LITTLE_ENDIAN);
  1218. list.forEach(buf::putFloat);
  1219. byte[] array = buf.array();
  1220. ByteString bs = ByteString.copyFrom(array);
  1221. byteStrings.add(bs);
  1222. } else if (vector instanceof ByteBuffer) {
  1223. plType = PlaceholderType.BinaryVector;
  1224. ByteBuffer buf = (ByteBuffer) vector;
  1225. byte[] array = buf.array();
  1226. ByteString bs = ByteString.copyFrom(array);
  1227. byteStrings.add(bs);
  1228. } else {
  1229. String msg = "Search target vector type is illegal(Only allow List<Float> or ByteBuffer)";
  1230. logError(msg);
  1231. return R.failed(R.Status.UnexpectedError, msg);
  1232. }
  1233. }
  1234. PlaceholderValue.Builder pldBuilder = PlaceholderValue.newBuilder()
  1235. .setTag(Constant.VECTOR_TAG)
  1236. .setType(plType);
  1237. byteStrings.forEach(pldBuilder::addValues);
  1238. PlaceholderValue plv = pldBuilder.build();
  1239. PlaceholderGroup placeholderGroup = PlaceholderGroup.newBuilder()
  1240. .addPlaceholders(plv)
  1241. .build();
  1242. ByteString byteStr = placeholderGroup.toByteString();
  1243. builder.setPlaceholderGroup(byteStr);
  1244. // search parameters
  1245. builder.addSearchParams(
  1246. KeyValuePair.newBuilder()
  1247. .setKey(Constant.VECTOR_FIELD)
  1248. .setValue(requestParam.getVectorFieldName())
  1249. .build())
  1250. .addSearchParams(
  1251. KeyValuePair.newBuilder()
  1252. .setKey(Constant.TOP_K)
  1253. .setValue(String.valueOf(requestParam.getTopK()))
  1254. .build())
  1255. .addSearchParams(
  1256. KeyValuePair.newBuilder()
  1257. .setKey(Constant.METRIC_TYPE)
  1258. .setValue(requestParam.getMetricType())
  1259. .build())
  1260. .addSearchParams(
  1261. KeyValuePair.newBuilder()
  1262. .setKey(Constant.ROUND_DECIMAL)
  1263. .setValue(String.valueOf(requestParam.getRoundDecimal()))
  1264. .build());
  1265. if (null != requestParam.getParams() && !requestParam.getParams().isEmpty()) {
  1266. builder.addSearchParams(
  1267. KeyValuePair.newBuilder()
  1268. .setKey(Constant.PARAMS)
  1269. .setValue(requestParam.getParams())
  1270. .build());
  1271. }
  1272. if (!requestParam.getOutFields().isEmpty()) {
  1273. requestParam.getOutFields().forEach(builder::addOutputFields);
  1274. }
  1275. // always use expression since dsl is discarded
  1276. builder.setDslType(DslType.BoolExprV1);
  1277. if (requestParam.getExpr() != null && !requestParam.getExpr().isEmpty()) {
  1278. builder.setDsl(requestParam.getExpr());
  1279. }
  1280. SearchRequest searchRequest = builder.build();
  1281. SearchResults response = this.blockingStub().search(searchRequest);
  1282. //TODO: truncate distance value by round decimal
  1283. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1284. logInfo("SearchRequest successfully!");
  1285. return R.success(response);
  1286. } else {
  1287. logError("SearchRequest failed:\n{}", response.getStatus().getReason());
  1288. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1289. response.getStatus().getReason());
  1290. }
  1291. } catch (StatusRuntimeException e) {
  1292. logError("SearchRequest RPC failed:{}", e.getMessage());
  1293. return R.failed(e);
  1294. } catch (Exception e) {
  1295. logError("SearchRequest failed:\n{}", e.getMessage());
  1296. return R.failed(e);
  1297. }
  1298. }
  1299. @Override
  1300. public R<QueryResults> query(@NonNull QueryParam requestParam) {
  1301. if (!clientIsReady()) {
  1302. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1303. }
  1304. logInfo(requestParam.toString());
  1305. try {
  1306. QueryRequest queryRequest = QueryRequest.newBuilder()
  1307. .setDbName("")
  1308. .setCollectionName(requestParam.getCollectionName())
  1309. .addAllPartitionNames(requestParam.getPartitionNames())
  1310. .addAllOutputFields(requestParam.getOutFields())
  1311. .setExpr(requestParam.getExpr())
  1312. .build();
  1313. QueryResults response = this.blockingStub().query(queryRequest);
  1314. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1315. logInfo("QueryRequest successfully!");
  1316. return R.success(response);
  1317. } else {
  1318. logError("QueryRequest failed:\n{}", response.getStatus().getReason());
  1319. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1320. response.getStatus().getReason());
  1321. }
  1322. } catch (StatusRuntimeException e) {
  1323. // e.printStackTrace();
  1324. logError("QueryRequest RPC failed:{}", e.getMessage());
  1325. return R.failed(e);
  1326. } catch (Exception e) {
  1327. logError("QueryRequest failed:\n{}", e.getMessage());
  1328. return R.failed(e);
  1329. }
  1330. }
  1331. @Override
  1332. public R<CalcDistanceResults> calcDistance(@NonNull CalcDistanceParam requestParam) {
  1333. if (!clientIsReady()) {
  1334. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1335. }
  1336. logInfo(requestParam.toString());
  1337. try {
  1338. List<List<Float>> vectors_left = requestParam.getVectorsLeft();
  1339. List<List<Float>> vectors_right = requestParam.getVectorsRight();
  1340. FloatArray.Builder left_float_array = FloatArray.newBuilder();
  1341. for (List<Float> vector : vectors_left) {
  1342. left_float_array.addAllData(vector);
  1343. }
  1344. FloatArray.Builder right_float_array = FloatArray.newBuilder();
  1345. for (List<Float> vector : vectors_right) {
  1346. right_float_array.addAllData(vector);
  1347. }
  1348. CalcDistanceRequest calcDistanceRequest = CalcDistanceRequest.newBuilder()
  1349. .setOpLeft(
  1350. VectorsArray.newBuilder()
  1351. .setDataArray(
  1352. VectorField.newBuilder()
  1353. .setFloatVector(left_float_array.build())
  1354. .setDim(vectors_left.get(0).size())
  1355. .build()
  1356. )
  1357. .build()
  1358. )
  1359. .setOpRight(
  1360. VectorsArray.newBuilder()
  1361. .setDataArray(
  1362. VectorField.newBuilder()
  1363. .setFloatVector(right_float_array.build())
  1364. .setDim(vectors_right.get(0).size())
  1365. .build()
  1366. )
  1367. .build()
  1368. )
  1369. .addParams(
  1370. KeyValuePair.newBuilder()
  1371. .setKey("metric")
  1372. .setValue(requestParam.getMetricType())
  1373. .build()
  1374. )
  1375. .build();
  1376. CalcDistanceResults response = blockingStub().calcDistance(calcDistanceRequest);
  1377. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1378. logInfo("CalcDistanceRequest successfully!");
  1379. return R.success(response);
  1380. } else {
  1381. logError("CalcDistanceRequest failed:\n{}", response.getStatus().getReason());
  1382. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1383. response.getStatus().getReason());
  1384. }
  1385. } catch (StatusRuntimeException e) {
  1386. logError("CalcDistanceRequest RPC failed:{}", e.getMessage());
  1387. return R.failed(e);
  1388. } catch (Exception e) {
  1389. logError("CalcDistanceRequest failed:\n{}", e.getMessage());
  1390. return R.failed(e);
  1391. }
  1392. }
  1393. @Override
  1394. public R<GetMetricsResponse> getMetrics(@NonNull GetMetricsParam requestParam) {
  1395. if (!clientIsReady()) {
  1396. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1397. }
  1398. logInfo(requestParam.toString());
  1399. try {
  1400. GetMetricsRequest getMetricsRequest = GetMetricsRequest.newBuilder()
  1401. .setRequest(requestParam.getRequest())
  1402. .build();
  1403. GetMetricsResponse response = blockingStub().getMetrics(getMetricsRequest);
  1404. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1405. logInfo("GetMetricsRequest successfully!");
  1406. return R.success(response);
  1407. } else {
  1408. logError("GetMetricsRequest failed:\n{}", response.getStatus().getReason());
  1409. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1410. response.getStatus().getReason());
  1411. }
  1412. } catch (StatusRuntimeException e) {
  1413. logError("GetMetricsRequest RPC failed:\n{}", e.getStatus().toString());
  1414. return R.failed(e);
  1415. } catch (Exception e) {
  1416. logError("GetMetricsRequest failed:\n{}", e.getMessage());
  1417. return R.failed(e);
  1418. }
  1419. }
  1420. @Override
  1421. public R<GetPersistentSegmentInfoResponse> getPersistentSegmentInfo(@NonNull GetPersistentSegmentInfoParam requestParam) {
  1422. if (!clientIsReady()) {
  1423. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1424. }
  1425. logInfo(requestParam.toString());
  1426. try {
  1427. GetPersistentSegmentInfoRequest getSegmentInfoRequest = GetPersistentSegmentInfoRequest.newBuilder()
  1428. .setCollectionName(requestParam.getCollectionName())
  1429. .build();
  1430. GetPersistentSegmentInfoResponse response = blockingStub().getPersistentSegmentInfo(getSegmentInfoRequest);
  1431. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1432. logInfo("GetPersistentSegmentInfoRequest successfully!");
  1433. return R.success(response);
  1434. } else {
  1435. logError("GetPersistentSegmentInfoRequest failed:\n{}", response.getStatus().getReason());
  1436. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1437. response.getStatus().getReason());
  1438. }
  1439. } catch (StatusRuntimeException e) {
  1440. logError("GetPersistentSegmentInfoRequest RPC failed:\n{}", e.getStatus().toString());
  1441. return R.failed(e);
  1442. } catch (Exception e) {
  1443. logError("GetPersistentSegmentInfoRequest failed:\n{}", e.getMessage());
  1444. return R.failed(e);
  1445. }
  1446. }
  1447. @Override
  1448. public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(@NonNull GetQuerySegmentInfoParam requestParam) {
  1449. if (!clientIsReady()) {
  1450. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1451. }
  1452. logInfo(requestParam.toString());
  1453. try {
  1454. GetQuerySegmentInfoRequest getSegmentInfoRequest = GetQuerySegmentInfoRequest.newBuilder()
  1455. .setCollectionName(requestParam.getCollectionName())
  1456. .build();
  1457. GetQuerySegmentInfoResponse response = blockingStub().getQuerySegmentInfo(getSegmentInfoRequest);
  1458. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1459. logInfo("GetQuerySegmentInfoRequest successfully!");
  1460. return R.success(response);
  1461. } else {
  1462. logError("GetQuerySegmentInfoRequest failed:\n{}", response.getStatus().getReason());
  1463. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1464. response.getStatus().getReason());
  1465. }
  1466. } catch (StatusRuntimeException e) {
  1467. logError("GetQuerySegmentInfoRequest RPC failed:\n{}", e.getStatus().toString());
  1468. return R.failed(e);
  1469. } catch (Exception e) {
  1470. logError("GetQuerySegmentInfoRequest failed:\n{}", e.getMessage());
  1471. return R.failed(e);
  1472. }
  1473. }
  1474. @Override
  1475. public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
  1476. if (!clientIsReady()) {
  1477. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1478. }
  1479. logInfo(requestParam.toString());
  1480. try {
  1481. LoadBalanceRequest loadBalanceRequest = LoadBalanceRequest.newBuilder()
  1482. .setSrcNodeID(requestParam.getSrcNodeID())
  1483. .addAllDstNodeIDs(requestParam.getDestNodeIDs())
  1484. .addAllSealedSegmentIDs(requestParam.getSegmentIDs())
  1485. .build();
  1486. Status response = blockingStub().loadBalance(loadBalanceRequest);
  1487. if (response.getErrorCode() == ErrorCode.Success) {
  1488. logInfo("LoadBalanceRequest successfully!");
  1489. return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
  1490. } else {
  1491. logError("LoadBalanceRequest failed! \n{}", response.getReason());
  1492. return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
  1493. }
  1494. } catch (StatusRuntimeException e) {
  1495. logError("LoadBalanceRequest RPC failed:\n{}", e.getStatus().toString());
  1496. return R.failed(e);
  1497. } catch (Exception e) {
  1498. logError("LoadBalanceRequest failed:\n{}", e.getMessage());
  1499. return R.failed(e);
  1500. }
  1501. }
  1502. @Override
  1503. public R<GetCompactionStateResponse> getCompactionState(GetCompactionStateParam requestParam) {
  1504. if (!clientIsReady()) {
  1505. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1506. }
  1507. logInfo(requestParam.toString());
  1508. try {
  1509. GetCompactionStateRequest getCompactionStateRequest = GetCompactionStateRequest.newBuilder()
  1510. .setCompactionID(requestParam.getCompactionID())
  1511. .build();
  1512. GetCompactionStateResponse response = blockingStub().getCompactionState(getCompactionStateRequest);
  1513. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1514. logInfo("GetCompactionStateRequest successfully!");
  1515. return R.success(response);
  1516. } else {
  1517. logError("GetCompactionStateRequest failed:\n{}", response.getStatus().getReason());
  1518. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1519. response.getStatus().getReason());
  1520. }
  1521. } catch (StatusRuntimeException e) {
  1522. logError("GetCompactionStateRequest RPC failed:\n{}", e.getStatus().toString());
  1523. return R.failed(e);
  1524. } catch (Exception e) {
  1525. logError("GetCompactionStateRequest failed:\n{}", e.getMessage());
  1526. return R.failed(e);
  1527. }
  1528. }
  1529. @Override
  1530. public R<ManualCompactionResponse> manualCompaction(ManualCompactionParam requestParam) {
  1531. if (!clientIsReady()) {
  1532. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1533. }
  1534. logInfo(requestParam.toString());
  1535. try {
  1536. ManualCompactionRequest manualCompactionRequest = ManualCompactionRequest.newBuilder()
  1537. .setCollectionID(requestParam.getCollectionID())
  1538. .build();
  1539. ManualCompactionResponse response = blockingStub().manualCompaction(manualCompactionRequest);
  1540. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1541. logInfo("ManualCompactionRequest successfully!");
  1542. return R.success(response);
  1543. } else {
  1544. logError("ManualCompactionRequest failed:\n{}", response.getStatus().getReason());
  1545. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1546. response.getStatus().getReason());
  1547. }
  1548. } catch (StatusRuntimeException e) {
  1549. logError("ManualCompactionRequest RPC failed:\n{}", e.getStatus().toString());
  1550. return R.failed(e);
  1551. } catch (Exception e) {
  1552. logError("ManualCompactionRequest failed:\n{}", e.getMessage());
  1553. return R.failed(e);
  1554. }
  1555. }
  1556. @Override
  1557. public R<GetCompactionPlansResponse> getCompactionStateWithPlans(GetCompactionPlansParam requestParam) {
  1558. if (!clientIsReady()) {
  1559. return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
  1560. }
  1561. logInfo(requestParam.toString());
  1562. try {
  1563. GetCompactionPlansRequest getCompactionPlansRequest = GetCompactionPlansRequest.newBuilder()
  1564. .setCompactionID(requestParam.getCompactionID())
  1565. .build();
  1566. GetCompactionPlansResponse response = blockingStub().getCompactionStateWithPlans(getCompactionPlansRequest);
  1567. if (response.getStatus().getErrorCode() == ErrorCode.Success) {
  1568. logInfo("GetCompactionPlansRequest successfully!");
  1569. return R.success(response);
  1570. } else {
  1571. logError("GetCompactionPlansRequest failed:\n{}", response.getStatus().getReason());
  1572. return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
  1573. response.getStatus().getReason());
  1574. }
  1575. } catch (StatusRuntimeException e) {
  1576. logError("GetCompactionPlansRequest RPC failed:\n{}", e.getStatus().toString());
  1577. return R.failed(e);
  1578. } catch (Exception e) {
  1579. logError("GetCompactionPlansRequest failed:\n{}", e.getMessage());
  1580. return R.failed(e);
  1581. }
  1582. }
  1583. ///////////////////// Log Functions//////////////////////
  1584. private void logInfo(String msg, Object... params) {
  1585. logger.info(msg, params);
  1586. }
  1587. private void logWarning(String msg, Object... params) {
  1588. logger.warn(msg, params);
  1589. }
  1590. private void logError(String msg, Object... params) {
  1591. logger.error(msg, params);
  1592. }
  1593. }