2
0

BulkWriterExample.java 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package io.milvus.v1;
  20. import com.fasterxml.jackson.annotation.JsonProperty;
  21. import com.fasterxml.jackson.dataformat.csv.CsvMapper;
  22. import com.fasterxml.jackson.dataformat.csv.CsvSchema;
  23. import com.google.common.collect.Lists;
  24. import com.google.gson.Gson;
  25. import com.google.gson.JsonElement;
  26. import com.google.gson.JsonObject;
  27. import com.google.gson.reflect.TypeToken;
  28. import io.milvus.bulkwriter.BulkWriter;
  29. import io.milvus.bulkwriter.CloudImport;
  30. import io.milvus.bulkwriter.LocalBulkWriter;
  31. import io.milvus.bulkwriter.LocalBulkWriterParam;
  32. import io.milvus.bulkwriter.RemoteBulkWriter;
  33. import io.milvus.bulkwriter.RemoteBulkWriterParam;
  34. import io.milvus.bulkwriter.common.clientenum.BulkFileType;
  35. import io.milvus.bulkwriter.common.clientenum.CloudStorage;
  36. import io.milvus.bulkwriter.common.utils.GeneratorUtils;
  37. import io.milvus.bulkwriter.common.utils.ImportUtils;
  38. import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
  39. import io.milvus.bulkwriter.connect.AzureConnectParam;
  40. import io.milvus.bulkwriter.connect.S3ConnectParam;
  41. import io.milvus.bulkwriter.connect.StorageConnectParam;
  42. import io.milvus.bulkwriter.request.BulkImportRequest;
  43. import io.milvus.bulkwriter.request.GetImportProgressRequest;
  44. import io.milvus.bulkwriter.request.ListImportJobsRequest;
  45. import io.milvus.client.MilvusClient;
  46. import io.milvus.client.MilvusServiceClient;
  47. import io.milvus.common.utils.ExceptionUtils;
  48. import io.milvus.grpc.DataType;
  49. import io.milvus.grpc.GetCollectionStatisticsResponse;
  50. import io.milvus.grpc.GetImportStateResponse;
  51. import io.milvus.grpc.ImportResponse;
  52. import io.milvus.grpc.ImportState;
  53. import io.milvus.grpc.KeyValuePair;
  54. import io.milvus.grpc.QueryResults;
  55. import io.milvus.param.ConnectParam;
  56. import io.milvus.param.IndexType;
  57. import io.milvus.param.MetricType;
  58. import io.milvus.param.R;
  59. import io.milvus.param.RpcStatus;
  60. import io.milvus.param.bulkinsert.BulkInsertParam;
  61. import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
  62. import io.milvus.param.collection.CollectionSchemaParam;
  63. import io.milvus.param.collection.CreateCollectionParam;
  64. import io.milvus.param.collection.DropCollectionParam;
  65. import io.milvus.param.collection.FieldType;
  66. import io.milvus.param.collection.FlushParam;
  67. import io.milvus.param.collection.GetCollectionStatisticsParam;
  68. import io.milvus.param.collection.HasCollectionParam;
  69. import io.milvus.param.collection.LoadCollectionParam;
  70. import io.milvus.param.dml.QueryParam;
  71. import io.milvus.param.index.CreateIndexParam;
  72. import io.milvus.response.GetCollStatResponseWrapper;
  73. import io.milvus.response.QueryResultsWrapper;
  74. import org.apache.avro.generic.GenericData;
  75. import org.apache.commons.lang3.StringUtils;
  76. import org.apache.http.util.Asserts;
  77. import java.io.File;
  78. import java.io.IOException;
  79. import java.net.URL;
  80. import java.nio.ByteBuffer;
  81. import java.util.ArrayList;
  82. import java.util.Iterator;
  83. import java.util.List;
  84. import java.util.Optional;
  85. import java.util.concurrent.TimeUnit;
  86. public class BulkWriterExample {
  87. // milvus
  88. public static final String HOST = "127.0.0.1";
  89. public static final Integer PORT = 19530;
  90. public static final String USER_NAME = "user.name";
  91. public static final String PASSWORD = "password";
  92. private static final Gson GSON_INSTANCE = new Gson();
  93. private static final List<Integer> QUERY_IDS = Lists.newArrayList(100, 5000);
  94. /**
  95. * If you need to transfer the files generated by bulkWriter to the corresponding remote storage (AWS S3, GCP GCS, Azure Blob, Aliyun OSS, Tencent Cloud TOS),
  96. * you need to configure it accordingly; Otherwise, you can ignore it.
  97. */
  98. public static class StorageConsts {
  99. public static final CloudStorage cloudStorage = CloudStorage.MINIO;
  100. /**
  101. * If using remote storage such as AWS S3, GCP GCS, Aliyun OSS, Tencent Cloud TOS, Minio
  102. * please configure the following parameters.
  103. */
  104. public static final String STORAGE_ENDPOINT = cloudStorage.getEndpoint("http://127.0.0.1:9000");
  105. public static final String STORAGE_BUCKET = "a-bucket"; // default bucket name of MinIO/Milvus standalone
  106. public static final String STORAGE_ACCESS_KEY = "minioadmin"; // default ak of MinIO/Milvus standalone
  107. public static final String STORAGE_SECRET_KEY = "minioadmin"; // default sk of MinIO/Milvus standalone
  108. /**
  109. * if using remote storage, please configure the parameter
  110. * if using local storage such as Local Minio, please set this parameter to empty.
  111. */
  112. public static final String STORAGE_REGION = "";
  113. /**
  114. * If using remote storage such as Azure Blob
  115. * please configure the following parameters.
  116. */
  117. public static final String AZURE_CONTAINER_NAME = "azure.container.name";
  118. public static final String AZURE_ACCOUNT_NAME = "azure.account.name";
  119. public static final String AZURE_ACCOUNT_KEY = "azure.account.key";
  120. }
  121. /**
  122. * If you have used remoteBulkWriter to generate remote data and want to import data using the Import interface on Zilliz Cloud after generation,
  123. * you don't need to configure the following object-related parameters (OBJECT_URL, OBJECT_ACCESS_KEY, OBJECT_SECRET_KEY). You can call the callCloudImport method, as the internal logic has been encapsulated for you.
  124. * <p>
  125. * If you already have data stored in remote storage (not generated through remoteBulkWriter), and you want to invoke the Import interface on Zilliz Cloud to import data,
  126. * you need to configure the following parameters and then follow the exampleCloudBulkInsert method.
  127. * <p>
  128. * If you do not need to import data through the Import interface on Zilliz Cloud, you can ignore this.
  129. */
  130. public static class CloudImportConsts {
  131. /**
  132. * The value of the URL is fixed.
  133. * For overseas regions, it is: https://api.cloud.zilliz.com
  134. * For regions in China, it is: https://api.cloud.zilliz.com.cn
  135. */
  136. public static final String CLOUD_ENDPOINT = "https://api.cloud.zilliz.com";
  137. public static final String API_KEY = "_api_key_for_cluster_org_";
  138. public static final String CLUSTER_ID = "_your_cloud_cluster_id_";
  139. public static final String COLLECTION_NAME = "_collection_name_on_the_cluster_id_";
  140. // If partition_name is not specified, use ""
  141. public static final String PARTITION_NAME = "_partition_name_on_the_collection_";
  142. /**
  143. * Please provide the complete URL for the file or folder you want to import, similar to https://bucket-name.s3.region-code.amazonaws.com/object-name.
  144. * For more details, you can refer to https://docs.zilliz.com/docs/import-data-on-web-ui.
  145. */
  146. public static final String OBJECT_URL = "_your_storage_object_url_";
  147. public static final String OBJECT_ACCESS_KEY = "_your_storage_access_key_";
  148. public static final String OBJECT_SECRET_KEY = "_your_storage_secret_key_";
  149. }
  150. private static final String SIMPLE_COLLECTION_NAME = "java_sdk_bulkwriter_simple_v1";
  151. private static final String ALL_TYPES_COLLECTION_NAME = "java_sdk_bulkwriter_all_v1";
  152. private static final Integer DIM = 512;
  153. private static final Integer ARRAY_CAPACITY = 10;
  154. private MilvusClient milvusClient;
  155. public static void main(String[] args) throws Exception {
  156. BulkWriterExample exampleBulkWriter = new BulkWriterExample();
  157. exampleBulkWriter.createConnection();
  158. List<BulkFileType> fileTypes = Lists.newArrayList(
  159. BulkFileType.PARQUET
  160. );
  161. exampleSimpleCollection(exampleBulkWriter, fileTypes);
  162. exampleAllTypesCollectionRemote(exampleBulkWriter, fileTypes);
  163. // to call cloud import api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
  164. // exampleCloudImport();
  165. }
  166. private void createConnection() {
  167. System.out.println("\nCreate connection...");
  168. ConnectParam connectParam = ConnectParam.newBuilder()
  169. .withHost(HOST)
  170. .withPort(PORT)
  171. .withAuthorization(USER_NAME, PASSWORD)
  172. .build();
  173. milvusClient = new MilvusServiceClient(connectParam);
  174. System.out.println("\nConnected");
  175. }
  176. private static void exampleSimpleCollection(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
  177. CollectionSchemaParam collectionSchema = exampleBulkWriter.buildSimpleSchema();
  178. exampleBulkWriter.createCollection(SIMPLE_COLLECTION_NAME, collectionSchema, false);
  179. for (BulkFileType fileType : fileTypes) {
  180. localWriter(collectionSchema, fileType);
  181. }
  182. for (BulkFileType fileType : fileTypes) {
  183. remoteWriter(collectionSchema, fileType);
  184. }
  185. // parallel append
  186. parallelAppend(collectionSchema);
  187. }
  188. private static void exampleAllTypesCollectionRemote(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
  189. // 4 types vectors + all scalar types + dynamic field enabled, use bulkInsert interface
  190. for (BulkFileType fileType : fileTypes) {
  191. CollectionSchemaParam collectionSchema = buildAllTypesSchema();
  192. List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(collectionSchema, fileType);
  193. exampleBulkWriter.callBulkInsert(collectionSchema, batchFiles);
  194. exampleBulkWriter.retrieveImportData();
  195. }
  196. // // 4 types vectors + all scalar types + dynamic field enabled, use cloud import api.
  197. // // You need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
  198. // for (BulkFileType fileType : fileTypes) {
  199. // CollectionSchemaParam collectionSchema = buildAllTypesSchema();
  200. // List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(collectionSchema, fileType);
  201. // exampleBulkWriter.createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);
  202. // exampleBulkWriter.callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME, StringUtils.EMPTY);
  203. // exampleBulkWriter.retrieveImportData();
  204. // }
  205. }
  206. private static void localWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
  207. System.out.printf("\n===================== local writer (%s) ====================%n", fileType.name());
  208. LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
  209. .withCollectionSchema(collectionSchema)
  210. .withLocalPath("/tmp/bulk_writer")
  211. .withFileType(fileType)
  212. .withChunkSize(128 * 1024 * 1024)
  213. .build();
  214. try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
  215. // read data from csv
  216. readCsvSampleData("data/train_embeddings.csv", localBulkWriter);
  217. // append rows
  218. for (int i = 0; i < 100000; i++) {
  219. JsonObject row = new JsonObject();
  220. row.addProperty("path", "path_" + i);
  221. row.add("vector", GSON_INSTANCE.toJsonTree(GeneratorUtils.genFloatVector(DIM)));
  222. row.addProperty("label", "label_" + i);
  223. localBulkWriter.appendRow(row);
  224. }
  225. System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
  226. System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());
  227. localBulkWriter.commit(false);
  228. List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
  229. System.out.printf("Local writer done! output local files: %s%n", batchFiles);
  230. } catch (Exception e) {
  231. System.out.println("Local writer catch exception: " + e);
  232. throw e;
  233. }
  234. }
  235. private static void remoteWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
  236. System.out.printf("\n===================== remote writer (%s) ====================%n", fileType.name());
  237. try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
  238. // read data from csv
  239. readCsvSampleData("data/train_embeddings.csv", remoteBulkWriter);
  240. // append rows
  241. for (int i = 0; i < 100000; i++) {
  242. JsonObject row = new JsonObject();
  243. row.addProperty("path", "path_" + i);
  244. row.add("vector", GSON_INSTANCE.toJsonTree(GeneratorUtils.genFloatVector(DIM)));
  245. row.addProperty("label", "label_" + i);
  246. remoteBulkWriter.appendRow(row);
  247. }
  248. System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
  249. System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
  250. remoteBulkWriter.commit(false);
  251. List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
  252. System.out.printf("Remote writer done! output remote files: %s%n", batchFiles);
  253. } catch (Exception e) {
  254. System.out.println("Remote writer catch exception: " + e);
  255. throw e;
  256. }
  257. }
  258. private static void parallelAppend(CollectionSchemaParam collectionSchema) throws Exception {
  259. System.out.print("\n===================== parallel append ====================");
  260. LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
  261. .withCollectionSchema(collectionSchema)
  262. .withLocalPath("/tmp/bulk_writer")
  263. .withFileType(BulkFileType.PARQUET)
  264. .withChunkSize(128 * 1024 * 1024) // 128MB
  265. .build();
  266. try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
  267. List<Thread> threads = new ArrayList<>();
  268. int threadCount = 10;
  269. int rowsPerThread = 1000;
  270. for (int i = 0; i < threadCount; ++i) {
  271. int current = i;
  272. Thread thread = new Thread(() -> appendRow(localBulkWriter, current * rowsPerThread, (current + 1) * rowsPerThread));
  273. threads.add(thread);
  274. thread.start();
  275. System.out.printf("Thread %s started%n", thread.getName());
  276. }
  277. for (Thread thread : threads) {
  278. thread.join();
  279. System.out.printf("Thread %s finished%n", thread.getName());
  280. }
  281. System.out.println(localBulkWriter.getTotalRowCount() + " rows appends");
  282. System.out.println(localBulkWriter.getBufferRowCount() + " rows in buffer not flushed");
  283. localBulkWriter.commit(false);
  284. System.out.printf("Append finished, %s rows%n", threadCount * rowsPerThread);
  285. int rowCount = 0;
  286. List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
  287. for (List<String> batch : batchFiles) {
  288. for (String filePath : batch) {
  289. rowCount += readParquet(filePath);
  290. }
  291. }
  292. Asserts.check(rowCount == threadCount * rowsPerThread, String.format("rowCount %s not equals expected %s", rowCount, threadCount * rowsPerThread));
  293. System.out.println("Data is correct");
  294. } catch (Exception e) {
  295. System.out.println("parallelAppend catch exception: " + e);
  296. throw e;
  297. }
  298. }
  299. private static long readParquet(String localFilePath) throws Exception {
  300. final long[] rowCount = {0};
  301. new ParquetReaderUtils() {
  302. @Override
  303. public void readRecord(GenericData.Record record) {
  304. rowCount[0]++;
  305. String pathValue = record.get("path").toString();
  306. String labelValue = record.get("label").toString();
  307. Asserts.check(pathValue.replace("path_", "").equals(labelValue.replace("label_", "")), String.format("the suffix of %s not equals the suffix of %s", pathValue, labelValue));
  308. }
  309. }.readParquet(localFilePath);
  310. System.out.printf("The file %s contains %s rows. Verify the content...%n", localFilePath, rowCount[0]);
  311. return rowCount[0];
  312. }
  313. private static void appendRow(LocalBulkWriter writer, int begin, int end) {
  314. try {
  315. for (int i = begin; i < end; ++i) {
  316. JsonObject row = new JsonObject();
  317. row.addProperty("path", "path_" + i);
  318. row.add("vector", GSON_INSTANCE.toJsonTree(GeneratorUtils.genFloatVector(DIM)));
  319. row.addProperty("label", "label_" + i);
  320. writer.appendRow(row);
  321. if (i % 100 == 0) {
  322. System.out.printf("%s inserted %s items%n", Thread.currentThread().getName(), i - begin);
  323. }
  324. }
  325. } catch (Exception e) {
  326. System.out.println("failed to append row!");
  327. }
  328. }
  329. private List<List<String>> allTypesRemoteWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
  330. System.out.printf("\n===================== all field types (%s) ====================%n", fileType.name());
  331. try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
  332. System.out.println("Append rows");
  333. int batchCount = 10000;
  334. for (int i = 0; i < batchCount; ++i) {
  335. JsonObject rowObject = new JsonObject();
  336. // scalar field
  337. rowObject.addProperty("id", i);
  338. rowObject.addProperty("bool", i % 5 == 0);
  339. rowObject.addProperty("int8", i % 128);
  340. rowObject.addProperty("int16", i % 1000);
  341. rowObject.addProperty("int32", i % 100000);
  342. rowObject.addProperty("float", i / 3);
  343. rowObject.addProperty("double", i / 7);
  344. rowObject.addProperty("varchar", "varchar_" + i);
  345. rowObject.addProperty("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
  346. // vector field
  347. rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateFloatVector(DIM)));
  348. rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateBinaryVector(DIM).array()));
  349. rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateFloat16Vector(DIM, false).array()));
  350. rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateSparseVector()));
  351. // array field
  352. rowObject.add("array_bool", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorBoolValue(10)));
  353. rowObject.add("array_int8", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt8Value(10)));
  354. rowObject.add("array_int16", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt16Value(10)));
  355. rowObject.add("array_int32", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt32Value(10)));
  356. rowObject.add("array_int64", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorLongValue(10)));
  357. rowObject.add("array_varchar", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorVarcharValue(10, 10)));
  358. rowObject.add("array_float", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorFloatValue(10)));
  359. rowObject.add("array_double", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorDoubleValue(10)));
  360. // dynamic fields
  361. if (collectionSchema.isEnableDynamicField()) {
  362. rowObject.addProperty("dynamic", "dynamic_" + i);
  363. }
  364. if (QUERY_IDS.contains(i)) {
  365. System.out.println(rowObject);
  366. }
  367. remoteBulkWriter.appendRow(rowObject);
  368. }
  369. System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
  370. System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
  371. System.out.println("Generate data files...");
  372. remoteBulkWriter.commit(false);
  373. System.out.printf("Data files have been uploaded: %s%n", remoteBulkWriter.getBatchFiles());
  374. return remoteBulkWriter.getBatchFiles();
  375. } catch (Exception e) {
  376. System.out.println("allTypesRemoteWriter catch exception: " + e);
  377. throw e;
  378. }
  379. }
  380. private static RemoteBulkWriter buildRemoteBulkWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws IOException {
  381. StorageConnectParam connectParam = buildStorageConnectParam();
  382. RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
  383. .withCollectionSchema(collectionSchema)
  384. .withRemotePath("bulk_data")
  385. .withFileType(fileType)
  386. .withChunkSize(512 * 1024 * 1024)
  387. .withConnectParam(connectParam)
  388. .build();
  389. return new RemoteBulkWriter(bulkWriterParam);
  390. }
  391. private static StorageConnectParam buildStorageConnectParam() {
  392. StorageConnectParam connectParam;
  393. if (StorageConsts.cloudStorage == CloudStorage.AZURE) {
  394. String connectionStr = "DefaultEndpointsProtocol=https;AccountName=" + StorageConsts.AZURE_ACCOUNT_NAME +
  395. ";AccountKey=" + StorageConsts.AZURE_ACCOUNT_KEY + ";EndpointSuffix=core.windows.net";
  396. connectParam = AzureConnectParam.newBuilder()
  397. .withConnStr(connectionStr)
  398. .withContainerName(StorageConsts.AZURE_CONTAINER_NAME)
  399. .build();
  400. } else {
  401. connectParam = S3ConnectParam.newBuilder()
  402. .withEndpoint(StorageConsts.STORAGE_ENDPOINT)
  403. .withCloudName(StorageConsts.cloudStorage.getCloudName())
  404. .withBucketName(StorageConsts.STORAGE_BUCKET)
  405. .withAccessKey(StorageConsts.STORAGE_ACCESS_KEY)
  406. .withSecretKey(StorageConsts.STORAGE_SECRET_KEY)
  407. .withRegion(StorageConsts.STORAGE_REGION)
  408. .build();
  409. }
  410. return connectParam;
  411. }
  412. private static void readCsvSampleData(String filePath, BulkWriter writer) throws IOException, InterruptedException {
  413. ClassLoader classLoader = BulkWriterExample.class.getClassLoader();
  414. URL resourceUrl = classLoader.getResource(filePath);
  415. filePath = new File(resourceUrl.getFile()).getAbsolutePath();
  416. CsvMapper csvMapper = new CsvMapper();
  417. File csvFile = new File(filePath);
  418. CsvSchema csvSchema = CsvSchema.builder().setUseHeader(true).build();
  419. Iterator<CsvDataObject> iterator = csvMapper.readerFor(CsvDataObject.class).with(csvSchema).readValues(csvFile);
  420. while (iterator.hasNext()) {
  421. CsvDataObject dataObject = iterator.next();
  422. JsonObject row = new JsonObject();
  423. row.add("vector", GSON_INSTANCE.toJsonTree(dataObject.toFloatArray()));
  424. row.addProperty("label", dataObject.getLabel());
  425. row.addProperty("path", dataObject.getPath());
  426. writer.appendRow(row);
  427. }
  428. }
  429. private static class CsvDataObject {
  430. @JsonProperty
  431. private String vector;
  432. @JsonProperty
  433. private String path;
  434. @JsonProperty
  435. private String label;
  436. public String getVector() {
  437. return vector;
  438. }
  439. public String getPath() {
  440. return path;
  441. }
  442. public String getLabel() {
  443. return label;
  444. }
  445. public List<Float> toFloatArray() {
  446. return GSON_INSTANCE.fromJson(vector, new TypeToken<List<Float>>() {
  447. }.getType());
  448. }
  449. }
  450. private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
  451. System.out.println("\n===================== call bulkInsert ====================");
  452. createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);
  453. List<Long> taskIds = new ArrayList<>();
  454. for (List<String> batch : batchFiles) {
  455. Long taskId = bulkInsert(batch);
  456. taskIds.add(taskId);
  457. System.out.println("Create a bulkInert task, task id: " + taskId);
  458. }
  459. while (!taskIds.isEmpty()) {
  460. Iterator<Long> iterator = taskIds.iterator();
  461. List<Long> tempTaskIds = new ArrayList<>();
  462. while (iterator.hasNext()) {
  463. Long taskId = iterator.next();
  464. System.out.println("Wait 5 second to check bulkInsert tasks state...");
  465. TimeUnit.SECONDS.sleep(5);
  466. GetImportStateResponse bulkInsertState = getBulkInsertState(taskId);
  467. if (bulkInsertState.getState() == ImportState.ImportFailed
  468. || bulkInsertState.getState() == ImportState.ImportFailedAndCleaned) {
  469. List<KeyValuePair> infosList = bulkInsertState.getInfosList();
  470. Optional<String> failedReasonOptional = infosList.stream().filter(e -> e.getKey().equals("failed_reason"))
  471. .map(KeyValuePair::getValue).findFirst();
  472. String failedReson = failedReasonOptional.orElse("");
  473. System.out.printf("The task %s failed, reason: %s%n", taskId, failedReson);
  474. } else if (bulkInsertState.getState() == ImportState.ImportCompleted) {
  475. System.out.printf("The task %s completed%n", taskId);
  476. } else {
  477. System.out.printf("The task %s is running, state:%s%n", taskId, bulkInsertState.getState());
  478. tempTaskIds.add(taskId);
  479. }
  480. }
  481. taskIds = tempTaskIds;
  482. }
  483. System.out.println("Collection row number: " + getCollectionStatistics());
  484. }
  485. private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
  486. System.out.println("\n===================== call cloudImport ====================");
  487. String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
  488. ? StorageConsts.cloudStorage.getAzureObjectUrl(StorageConsts.AZURE_ACCOUNT_NAME, StorageConsts.AZURE_CONTAINER_NAME, ImportUtils.getCommonPrefix(batchFiles))
  489. : StorageConsts.cloudStorage.getS3ObjectUrl(StorageConsts.STORAGE_BUCKET, ImportUtils.getCommonPrefix(batchFiles), StorageConsts.STORAGE_REGION);
  490. String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
  491. String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;
  492. BulkImportRequest bulkImportRequest = BulkImportRequest.builder()
  493. .objectUrl(objectUrl).accessKey(accessKey).secretKey(secretKey)
  494. .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
  495. .build();
  496. String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, bulkImportRequest);
  497. JsonObject bulkImportObject = convertDataMap(bulkImportResult);
  498. String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
  499. System.out.println("Create a cloudImport job, job id: " + jobId);
  500. while (true) {
  501. System.out.println("Wait 5 second to check bulkInsert job state...");
  502. TimeUnit.SECONDS.sleep(5);
  503. GetImportProgressRequest request = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
  504. String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
  505. JsonObject getImportProgressObject = convertDataMap(getImportProgressResult);
  506. String importProgressState = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
  507. String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
  508. String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
  509. if ("Completed".equals(importProgressState)) {
  510. System.out.printf("The job %s completed%n", jobId);
  511. break;
  512. } else if (StringUtils.isNotEmpty(reason)) {
  513. System.out.printf("The job %s failed or canceled, reason: %s%n", jobId, reason);
  514. break;
  515. } else {
  516. System.out.printf("The job %s is running, progress:%s%n", jobId, progress);
  517. }
  518. }
  519. System.out.println("Collection row number: " + getCollectionStatistics());
  520. }
  521. /**
  522. * @param collectionSchema collection info
  523. * @param dropIfExist if collection already exist, will drop firstly and then create again
  524. */
  525. private void createCollection(String collectionName, CollectionSchemaParam collectionSchema, boolean dropIfExist) {
  526. System.out.println("\n===================== create collection ====================");
  527. checkMilvusClientIfExist();
  528. CreateCollectionParam collectionParam = CreateCollectionParam.newBuilder()
  529. .withCollectionName(collectionName)
  530. .withSchema(collectionSchema)
  531. .build();
  532. R<Boolean> hasCollection = milvusClient.hasCollection(HasCollectionParam.newBuilder().withCollectionName(collectionName).build());
  533. if (hasCollection.getData()) {
  534. if (dropIfExist) {
  535. milvusClient.dropCollection(DropCollectionParam.newBuilder().withCollectionName(collectionName).build());
  536. milvusClient.createCollection(collectionParam);
  537. }
  538. } else {
  539. milvusClient.createCollection(collectionParam);
  540. }
  541. System.out.printf("Collection %s created%n", collectionName);
  542. }
  543. private void retrieveImportData() {
  544. createIndex();
  545. System.out.printf("Load collection and query items %s%n", QUERY_IDS);
  546. loadCollection();
  547. String expr = String.format("id in %s", QUERY_IDS);
  548. System.out.println(expr);
  549. List<QueryResultsWrapper.RowRecord> rowRecords = query(expr, Lists.newArrayList("*"));
  550. System.out.println("Query results:");
  551. for (QueryResultsWrapper.RowRecord record : rowRecords) {
  552. JsonObject rowObject = new JsonObject();
  553. // scalar field
  554. rowObject.addProperty("id", (Long)record.get("id"));
  555. rowObject.addProperty("bool", (Boolean) record.get("bool"));
  556. rowObject.addProperty("int8", (Integer) record.get("int8"));
  557. rowObject.addProperty("int16", (Integer) record.get("int16"));
  558. rowObject.addProperty("int32", (Integer) record.get("int32"));
  559. rowObject.addProperty("float", (Float) record.get("float"));
  560. rowObject.addProperty("double", (Double) record.get("double"));
  561. rowObject.addProperty("varchar", (String) record.get("varchar"));
  562. rowObject.add("json", (JsonElement) record.get("json"));
  563. // vector field
  564. rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(record.get("float_vector")));
  565. rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(((ByteBuffer)record.get("binary_vector")).array()));
  566. rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(((ByteBuffer)record.get("float16_vector")).array()));
  567. rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(record.get("sparse_vector")));
  568. // array field
  569. rowObject.add("array_bool", GSON_INSTANCE.toJsonTree(record.get("array_bool")));
  570. rowObject.add("array_int8", GSON_INSTANCE.toJsonTree(record.get("array_int8")));
  571. rowObject.add("array_int16", GSON_INSTANCE.toJsonTree(record.get("array_int16")));
  572. rowObject.add("array_int32", GSON_INSTANCE.toJsonTree(record.get("array_int32")));
  573. rowObject.add("array_int64", GSON_INSTANCE.toJsonTree(record.get("array_int64")));
  574. rowObject.add("array_varchar", GSON_INSTANCE.toJsonTree(record.get("array_varchar")));
  575. rowObject.add("array_float", GSON_INSTANCE.toJsonTree(record.get("array_float")));
  576. rowObject.add("array_double", GSON_INSTANCE.toJsonTree(record.get("array_double")));
  577. // dynamic field
  578. rowObject.addProperty("dynamic", (String) record.get("dynamic"));
  579. System.out.println(rowObject);
  580. }
  581. }
  582. private void createIndex() {
  583. System.out.println("Create index...");
  584. checkMilvusClientIfExist();
  585. R<RpcStatus> response = milvusClient.createIndex(CreateIndexParam.newBuilder()
  586. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  587. .withFieldName("float_vector")
  588. .withIndexType(IndexType.FLAT)
  589. .withMetricType(MetricType.L2)
  590. .withSyncMode(Boolean.TRUE)
  591. .build());
  592. ExceptionUtils.handleResponseStatus(response);
  593. response = milvusClient.createIndex(CreateIndexParam.newBuilder()
  594. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  595. .withFieldName("binary_vector")
  596. .withIndexType(IndexType.BIN_FLAT)
  597. .withMetricType(MetricType.HAMMING)
  598. .withSyncMode(Boolean.TRUE)
  599. .build());
  600. ExceptionUtils.handleResponseStatus(response);
  601. response = milvusClient.createIndex(CreateIndexParam.newBuilder()
  602. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  603. .withFieldName("float16_vector")
  604. .withIndexType(IndexType.FLAT)
  605. .withMetricType(MetricType.IP)
  606. .withSyncMode(Boolean.TRUE)
  607. .build());
  608. ExceptionUtils.handleResponseStatus(response);
  609. response = milvusClient.createIndex(CreateIndexParam.newBuilder()
  610. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  611. .withFieldName("sparse_vector")
  612. .withIndexType(IndexType.SPARSE_WAND)
  613. .withMetricType(MetricType.IP)
  614. .withSyncMode(Boolean.TRUE)
  615. .build());
  616. ExceptionUtils.handleResponseStatus(response);
  617. }
  618. private R<RpcStatus> loadCollection() {
  619. System.out.println("Loading Collection...");
  620. checkMilvusClientIfExist();
  621. R<RpcStatus> response = milvusClient.loadCollection(LoadCollectionParam.newBuilder()
  622. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  623. .build());
  624. ExceptionUtils.handleResponseStatus(response);
  625. return response;
  626. }
  627. private List<QueryResultsWrapper.RowRecord> query(String expr, List<String> outputFields) {
  628. System.out.println("========== query() ==========");
  629. checkMilvusClientIfExist();
  630. QueryParam test = QueryParam.newBuilder()
  631. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  632. .withExpr(expr)
  633. .withOutFields(outputFields)
  634. .build();
  635. R<QueryResults> response = milvusClient.query(test);
  636. ExceptionUtils.handleResponseStatus(response);
  637. QueryResultsWrapper wrapper = new QueryResultsWrapper(response.getData());
  638. return wrapper.getRowRecords();
  639. }
  640. private Long bulkInsert(List<String> batchFiles) {
  641. System.out.println("========== bulkInsert() ==========");
  642. checkMilvusClientIfExist();
  643. R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
  644. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  645. .withFiles(batchFiles)
  646. .build());
  647. ExceptionUtils.handleResponseStatus(response);
  648. return response.getData().getTasksList().get(0);
  649. }
  650. private GetImportStateResponse getBulkInsertState(Long taskId) {
  651. System.out.println("========== getBulkInsertState() ==========");
  652. checkMilvusClientIfExist();
  653. R<GetImportStateResponse> bulkInsertState = milvusClient.getBulkInsertState(GetBulkInsertStateParam.newBuilder()
  654. .withTask(taskId)
  655. .build());
  656. return bulkInsertState.getData();
  657. }
  658. private Long getCollectionStatistics() {
  659. System.out.println("========== getCollectionStatistics() ==========");
  660. // call flush() to flush the insert buffer to storage,
  661. // so that the getCollectionStatistics() can get correct number
  662. checkMilvusClientIfExist();
  663. milvusClient.flush(FlushParam.newBuilder().addCollectionName(ALL_TYPES_COLLECTION_NAME).build());
  664. R<GetCollectionStatisticsResponse> response = milvusClient.getCollectionStatistics(
  665. GetCollectionStatisticsParam.newBuilder()
  666. .withCollectionName(ALL_TYPES_COLLECTION_NAME)
  667. .build());
  668. ExceptionUtils.handleResponseStatus(response);
  669. GetCollStatResponseWrapper wrapper = new GetCollStatResponseWrapper(response.getData());
  670. return wrapper.getRowCount();
  671. }
  672. private static void exampleCloudImport() {
  673. System.out.println("\n===================== import files to cloud vectordb ====================");
  674. BulkImportRequest request = BulkImportRequest.builder()
  675. .objectUrl(CloudImportConsts.OBJECT_URL).accessKey(CloudImportConsts.OBJECT_ACCESS_KEY).secretKey(CloudImportConsts.OBJECT_SECRET_KEY)
  676. .clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
  677. .build();
  678. String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
  679. System.out.println(bulkImportResult);
  680. System.out.println("\n===================== get import job progress ====================");
  681. JsonObject bulkImportObject = convertDataMap(bulkImportResult);
  682. String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
  683. GetImportProgressRequest getImportProgressRequest = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
  684. String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, getImportProgressRequest);
  685. System.out.println(getImportProgressResult);
  686. System.out.println("\n===================== list import jobs ====================");
  687. ListImportJobsRequest listImportJobsRequest = ListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).build();
  688. String listImportJobsResult = CloudImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, listImportJobsRequest);
  689. System.out.println(listImportJobsResult);
  690. }
  691. private CollectionSchemaParam buildSimpleSchema() {
  692. FieldType fieldType1 = FieldType.newBuilder()
  693. .withName("id")
  694. .withDataType(DataType.Int64)
  695. .withPrimaryKey(true)
  696. .withAutoID(true)
  697. .build();
  698. // vector field
  699. FieldType fieldType2 = FieldType.newBuilder()
  700. .withName("vector")
  701. .withDataType(DataType.FloatVector)
  702. .withDimension(DIM)
  703. .build();
  704. // scalar field
  705. FieldType fieldType3 = FieldType.newBuilder()
  706. .withName("path")
  707. .withDataType(DataType.VarChar)
  708. .withMaxLength(512)
  709. .build();
  710. FieldType fieldType4 = FieldType.newBuilder()
  711. .withName("label")
  712. .withDataType(DataType.VarChar)
  713. .withMaxLength(512)
  714. .build();
  715. return CollectionSchemaParam.newBuilder()
  716. .addFieldType(fieldType1)
  717. .addFieldType(fieldType2)
  718. .addFieldType(fieldType3)
  719. .addFieldType(fieldType4)
  720. .build();
  721. }
  722. private static CollectionSchemaParam buildAllTypesSchema() {
  723. List<FieldType> fieldTypes = new ArrayList<>();
  724. // scalar field
  725. fieldTypes.add(FieldType.newBuilder()
  726. .withName("id")
  727. .withDataType(DataType.Int64)
  728. .withPrimaryKey(true)
  729. .withAutoID(false)
  730. .build());
  731. fieldTypes.add(FieldType.newBuilder()
  732. .withName("bool")
  733. .withDataType(DataType.Bool)
  734. .build());
  735. fieldTypes.add(FieldType.newBuilder()
  736. .withName("int8")
  737. .withDataType(DataType.Int8)
  738. .build());
  739. fieldTypes.add(FieldType.newBuilder()
  740. .withName("int16")
  741. .withDataType(DataType.Int16)
  742. .build());
  743. fieldTypes.add(FieldType.newBuilder()
  744. .withName("int32")
  745. .withDataType(DataType.Int32)
  746. .build());
  747. fieldTypes.add(FieldType.newBuilder()
  748. .withName("float")
  749. .withDataType(DataType.Float)
  750. .build());
  751. fieldTypes.add(FieldType.newBuilder()
  752. .withName("double")
  753. .withDataType(DataType.Double)
  754. .build());
  755. fieldTypes.add(FieldType.newBuilder()
  756. .withName("varchar")
  757. .withDataType(DataType.VarChar)
  758. .withMaxLength(512)
  759. .build());
  760. fieldTypes.add(FieldType.newBuilder()
  761. .withName("json")
  762. .withDataType(DataType.JSON)
  763. .build());
  764. // vector fields
  765. fieldTypes.add(FieldType.newBuilder()
  766. .withName("float_vector")
  767. .withDataType(DataType.FloatVector)
  768. .withDimension(DIM)
  769. .build());
  770. fieldTypes.add(FieldType.newBuilder()
  771. .withName("binary_vector")
  772. .withDataType(DataType.BinaryVector)
  773. .withDimension(DIM)
  774. .build());
  775. fieldTypes.add(FieldType.newBuilder()
  776. .withName("float16_vector")
  777. .withDataType(DataType.Float16Vector)
  778. .withDimension(DIM)
  779. .build());
  780. fieldTypes.add(FieldType.newBuilder()
  781. .withName("sparse_vector")
  782. .withDataType(DataType.SparseFloatVector)
  783. .build());
  784. // array fields
  785. fieldTypes.add(FieldType.newBuilder()
  786. .withName("array_bool")
  787. .withDataType(DataType.Array)
  788. .withElementType(DataType.Bool)
  789. .withMaxCapacity(ARRAY_CAPACITY)
  790. .build());
  791. fieldTypes.add(FieldType.newBuilder()
  792. .withName("array_int8")
  793. .withDataType(DataType.Array)
  794. .withElementType(DataType.Int8)
  795. .withMaxCapacity(ARRAY_CAPACITY)
  796. .build());
  797. fieldTypes.add(FieldType.newBuilder()
  798. .withName("array_int16")
  799. .withDataType(DataType.Array)
  800. .withElementType(DataType.Int16)
  801. .withMaxCapacity(ARRAY_CAPACITY)
  802. .build());
  803. fieldTypes.add(FieldType.newBuilder()
  804. .withName("array_int32")
  805. .withDataType(DataType.Array)
  806. .withElementType(DataType.Int32)
  807. .withMaxCapacity(ARRAY_CAPACITY)
  808. .build());
  809. fieldTypes.add(FieldType.newBuilder()
  810. .withName("array_int64")
  811. .withDataType(DataType.Array)
  812. .withElementType(DataType.Int64)
  813. .withMaxCapacity(ARRAY_CAPACITY)
  814. .build());
  815. fieldTypes.add(FieldType.newBuilder()
  816. .withName("array_varchar")
  817. .withDataType(DataType.Array)
  818. .withElementType(DataType.VarChar)
  819. .withMaxLength(512)
  820. .withMaxCapacity(ARRAY_CAPACITY)
  821. .build());
  822. fieldTypes.add(FieldType.newBuilder()
  823. .withName("array_float")
  824. .withDataType(DataType.Array)
  825. .withElementType(DataType.Float)
  826. .withMaxCapacity(ARRAY_CAPACITY)
  827. .build());
  828. fieldTypes.add(FieldType.newBuilder()
  829. .withName("array_double")
  830. .withDataType(DataType.Array)
  831. .withElementType(DataType.Double)
  832. .withMaxCapacity(ARRAY_CAPACITY)
  833. .build());
  834. CollectionSchemaParam.Builder schemaBuilder = CollectionSchemaParam.newBuilder()
  835. .withEnableDynamicField(true)
  836. .withFieldTypes(fieldTypes);
  837. return schemaBuilder.build();
  838. }
  839. private void checkMilvusClientIfExist() {
  840. if (milvusClient == null) {
  841. String msg = "milvusClient is null. Please initialize it by calling createConnection() first before use.";
  842. throw new RuntimeException(msg);
  843. }
  844. }
  845. private static JsonObject convertDataMap(String result) {
  846. return GSON_INSTANCE.fromJson(result, JsonObject.class);
  847. }
  848. }