collections.service.ts 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. import {
  2. CreateCollectionReq,
  3. DescribeCollectionReq,
  4. DropCollectionReq,
  5. GetCollectionStatisticsReq,
  6. InsertReq,
  7. LoadCollectionReq,
  8. ReleaseLoadCollectionReq,
  9. SearchReq,
  10. RenameCollectionReq,
  11. AlterAliasReq,
  12. CreateAliasReq,
  13. DropAliasReq,
  14. ShowCollectionsReq,
  15. ShowCollectionsType,
  16. DeleteEntitiesReq,
  17. GetCompactionStateReq,
  18. GetQuerySegmentInfoReq,
  19. GePersistentSegmentInfoReq,
  20. CompactReq,
  21. HasCollectionReq,
  22. CountReq,
  23. GetLoadStateReq,
  24. CollectionData,
  25. CreateIndexReq,
  26. DescribeIndexReq,
  27. DropIndexReq,
  28. } from '@zilliz/milvus2-sdk-node';
  29. import { Parser } from '@json2csv/plainjs';
  30. import {
  31. throwErrorFromSDK,
  32. findKeyValue,
  33. getKeyValueListFromJsonString,
  34. genRows,
  35. ROW_COUNT,
  36. convertFieldSchemaToFieldType,
  37. LOADING_STATE,
  38. DYNAMIC_FIELD,
  39. SimpleQueue,
  40. MIN_INT64,
  41. } from '../utils';
  42. import { QueryDto, ImportSampleDto, GetReplicasDto } from './dto';
  43. import {
  44. CollectionObject,
  45. CollectionLazyObject,
  46. FieldObject,
  47. IndexObject,
  48. DescribeCollectionRes,
  49. CountObject,
  50. StatisticsObject,
  51. CollectionFullObject,
  52. DescribeIndexRes,
  53. } from '../types';
  54. import { clientCache } from '../app';
  55. import { clients } from '../socket';
  56. import { WS_EVENTS } from '../utils';
  57. export class CollectionsService {
  58. async showCollections(clientId: string, data?: ShowCollectionsReq) {
  59. const { milvusClient } = clientCache.get(clientId);
  60. const res = await milvusClient.showCollections(data);
  61. throwErrorFromSDK(res.status);
  62. return res;
  63. }
  64. async createCollection(clientId: string, data: CreateCollectionReq) {
  65. const { milvusClient } = clientCache.get(clientId);
  66. const res = await milvusClient.createCollection(data);
  67. const newCollection = (await this.getAllCollections(clientId, [
  68. data.collection_name,
  69. ])) as CollectionFullObject[];
  70. throwErrorFromSDK(res);
  71. return newCollection[0];
  72. }
  73. async describeCollection(clientId: string, data: DescribeCollectionReq) {
  74. const { milvusClient } = clientCache.get(clientId);
  75. const res = (await milvusClient.describeCollection(
  76. data
  77. )) as DescribeCollectionRes;
  78. // get index info for collections
  79. const indexRes = await this.describeIndex(clientId, {
  80. collection_name: data.collection_name,
  81. });
  82. throwErrorFromSDK(res.status);
  83. const vectorFields: FieldObject[] = [];
  84. const scalarFields: FieldObject[] = [];
  85. // append index info to each field
  86. res.schema.fields.forEach((field: FieldObject) => {
  87. // add index
  88. field.index = indexRes.index_descriptions.find(
  89. index => index.field_name === field.name
  90. ) as IndexObject;
  91. // add dimension
  92. field.dimension =
  93. Number(field.type_params.find(item => item.key === 'dim')?.value) || -1;
  94. // add max capacity
  95. field.maxCapacity =
  96. Number(
  97. field.type_params.find(item => item.key === 'max_capacity')?.value
  98. ) || -1;
  99. // add max length
  100. field.maxLength =
  101. Number(
  102. field.type_params.find(item => item.key === 'max_length')?.value
  103. ) || -1;
  104. // classify fields
  105. if (
  106. field.data_type === 'BinaryVector' ||
  107. field.data_type === 'FloatVector'
  108. ) {
  109. vectorFields.push(field);
  110. } else {
  111. scalarFields.push(field);
  112. }
  113. if (field.is_primary_key) {
  114. res.schema.primaryField = field;
  115. }
  116. });
  117. // add extra data to schema
  118. res.schema.hasVectorIndex = vectorFields.some(v => v.index);
  119. res.schema.scalarFields = scalarFields;
  120. res.schema.vectorFields = vectorFields;
  121. res.schema.dynamicFields = res.schema.enable_dynamic_field
  122. ? [
  123. {
  124. name: DYNAMIC_FIELD,
  125. data_type: 'JSON',
  126. type_params: [],
  127. index: undefined,
  128. description: '',
  129. index_params: [],
  130. dimension: -1,
  131. maxCapacity: -1,
  132. maxLength: -1,
  133. autoID: false,
  134. },
  135. ]
  136. : [];
  137. return res;
  138. }
  139. async renameCollection(clientId: string, data: RenameCollectionReq) {
  140. const { milvusClient } = clientCache.get(clientId);
  141. const res = await milvusClient.renameCollection(data);
  142. throwErrorFromSDK(res);
  143. const newCollection = (await this.getAllCollections(clientId, [
  144. data.new_collection_name,
  145. ])) as CollectionFullObject[];
  146. return newCollection[0];
  147. }
  148. async dropCollection(clientId: string, data: DropCollectionReq) {
  149. const { milvusClient } = clientCache.get(clientId);
  150. const res = await milvusClient.dropCollection(data);
  151. throwErrorFromSDK(res);
  152. return res;
  153. }
  154. async loadCollection(clientId: string, data: LoadCollectionReq) {
  155. const { milvusClient } = clientCache.get(clientId);
  156. const res = await milvusClient.loadCollection(data);
  157. throwErrorFromSDK(res);
  158. const newCollection = (await this.getAllCollections(clientId, [
  159. data.collection_name,
  160. ])) as CollectionFullObject[];
  161. return newCollection[0];
  162. }
  163. async releaseCollection(clientId: string, data: ReleaseLoadCollectionReq) {
  164. const { milvusClient } = clientCache.get(clientId);
  165. const res = await milvusClient.releaseCollection(data);
  166. throwErrorFromSDK(res);
  167. const newCollection = (await this.getAllCollections(clientId, [
  168. data.collection_name,
  169. ])) as CollectionFullObject[];
  170. return newCollection[0];
  171. }
  172. async getCollectionStatistics(
  173. clientId: string,
  174. data: GetCollectionStatisticsReq
  175. ) {
  176. const { milvusClient } = clientCache.get(clientId);
  177. const res = await milvusClient.getCollectionStatistics(data);
  178. throwErrorFromSDK(res.status);
  179. return res;
  180. }
  181. async getLoadState(clientId: string, data: GetLoadStateReq) {
  182. const { milvusClient } = clientCache.get(clientId);
  183. const res = await milvusClient.getLoadState(data);
  184. throwErrorFromSDK(res.status);
  185. return res;
  186. }
  187. async count(clientId: string, data: CountReq) {
  188. const { milvusClient } = clientCache.get(clientId);
  189. let count = 0;
  190. try {
  191. const countRes = await milvusClient.count(data);
  192. count = countRes.data;
  193. } catch (error) {
  194. const collectionStatisticsRes = await this.getCollectionStatistics(
  195. clientId,
  196. data
  197. );
  198. count = collectionStatisticsRes.data.row_count;
  199. }
  200. return { rowCount: Number(count) } as CountObject;
  201. }
  202. async insert(clientId: string, data: InsertReq) {
  203. const { milvusClient } = clientCache.get(clientId);
  204. const res = await milvusClient.insert(data);
  205. throwErrorFromSDK(res.status);
  206. return res;
  207. }
  208. async deleteEntities(clientId: string, data: DeleteEntitiesReq) {
  209. const { milvusClient } = clientCache.get(clientId);
  210. const res = await milvusClient.deleteEntities(data);
  211. throwErrorFromSDK(res.status);
  212. return res;
  213. }
  214. async vectorSearch(clientId: string, data: SearchReq) {
  215. const { milvusClient } = clientCache.get(clientId);
  216. const now = Date.now();
  217. const res = await milvusClient.search(data);
  218. const after = Date.now();
  219. throwErrorFromSDK(res.status);
  220. Object.assign(res, { latency: after - now });
  221. return res;
  222. }
  223. async createAlias(clientId: string, data: CreateAliasReq) {
  224. const { milvusClient } = clientCache.get(clientId);
  225. const res = await milvusClient.createAlias(data);
  226. throwErrorFromSDK(res);
  227. const newCollection = (await this.getAllCollections(clientId, [
  228. data.collection_name,
  229. ])) as CollectionFullObject[];
  230. return newCollection[0];
  231. }
  232. async alterAlias(clientId: string, data: AlterAliasReq) {
  233. const { milvusClient } = clientCache.get(clientId);
  234. const res = await milvusClient.alterAlias(data);
  235. throwErrorFromSDK(res);
  236. return res;
  237. }
  238. async dropAlias(
  239. clientId: string,
  240. collection_name: string,
  241. data: DropAliasReq
  242. ) {
  243. const { milvusClient } = clientCache.get(clientId);
  244. const res = await milvusClient.dropAlias(data);
  245. throwErrorFromSDK(res);
  246. const newCollection = (await this.getAllCollections(clientId, [
  247. collection_name,
  248. ])) as CollectionFullObject[];
  249. return newCollection[0];
  250. }
  251. async getReplicas(clientId: string, data: GetReplicasDto) {
  252. const { milvusClient } = clientCache.get(clientId);
  253. const res = await milvusClient.getReplicas(data);
  254. return res;
  255. }
  256. async query(
  257. clientId: string,
  258. data: {
  259. collection_name: string;
  260. } & QueryDto
  261. ) {
  262. const { milvusClient } = clientCache.get(clientId);
  263. const now = Date.now();
  264. const res = await milvusClient.query(data);
  265. const after = Date.now();
  266. throwErrorFromSDK(res.status);
  267. Object.assign(res, { latency: after - now });
  268. return res;
  269. }
  270. // get single collection details
  271. async getCollection(
  272. clientId: string,
  273. collection: CollectionData,
  274. loadCollection: CollectionData,
  275. lazy: boolean = false
  276. ) {
  277. const { collectionsQueue } = clientCache.get(clientId);
  278. if (lazy) {
  279. // add to lazy queue
  280. collectionsQueue.enqueue(collection.name);
  281. // return lazy object
  282. return {
  283. id: collection.id,
  284. collection_name: collection.name,
  285. createdTime: Number(collection.timestamp),
  286. schema: undefined,
  287. rowCount: undefined,
  288. aliases: undefined,
  289. description: undefined,
  290. autoID: undefined,
  291. loadedPercentage: undefined,
  292. consistency_level: undefined,
  293. replicas: undefined,
  294. loaded: undefined,
  295. } as CollectionLazyObject;
  296. }
  297. // get collection schema and properties
  298. const collectionInfo = await this.describeCollection(clientId, {
  299. collection_name: collection.name,
  300. });
  301. // get collection statistic data
  302. const collectionStatisticsRes = await this.getCollectionStatistics(
  303. clientId,
  304. {
  305. collection_name: collection.name,
  306. }
  307. );
  308. // extract autoID
  309. const autoID = collectionInfo.schema.fields.find(
  310. v => v.is_primary_key === true
  311. )?.autoID;
  312. // get replica info
  313. let replicas;
  314. try {
  315. replicas = loadCollection
  316. ? await this.getReplicas(clientId, {
  317. collectionID: collectionInfo.collectionID,
  318. })
  319. : replicas;
  320. } catch (e) {
  321. console.log('ignore getReplica');
  322. }
  323. // loading info
  324. const loadedPercentage = !loadCollection
  325. ? -1
  326. : Number(loadCollection.loadedPercentage);
  327. const status =
  328. loadedPercentage === -1
  329. ? LOADING_STATE.UNLOADED
  330. : loadedPercentage === 100
  331. ? LOADING_STATE.LOADED
  332. : LOADING_STATE.LOADING;
  333. return {
  334. collection_name: collection.name,
  335. schema: collectionInfo.schema,
  336. rowCount: Number(collectionStatisticsRes.data.row_count),
  337. createdTime: parseInt(collectionInfo.created_utc_timestamp, 10),
  338. aliases: collectionInfo.aliases,
  339. description: collectionInfo.schema.description,
  340. autoID,
  341. id: collectionInfo.collectionID,
  342. loadedPercentage,
  343. consistency_level: collectionInfo.consistency_level,
  344. replicas: replicas && replicas.replicas,
  345. loaded: status === LOADING_STATE.LOADED,
  346. status,
  347. };
  348. }
  349. // get all collections details
  350. async getAllCollections(
  351. clientId: string,
  352. collectionName: string[] = []
  353. ): Promise<CollectionObject[]> {
  354. const cache = clientCache.get(clientId);
  355. // clear collectionsQueue
  356. if (collectionName.length === 0) {
  357. cache.collectionsQueue.stop();
  358. cache.collectionsQueue = new SimpleQueue<string>();
  359. }
  360. // get all collections(name, timestamp, id)
  361. const allCollections = await this.showCollections(clientId);
  362. // get all loaded collection
  363. const loadedCollections = await this.showCollections(clientId, {
  364. type: ShowCollectionsType.Loaded,
  365. });
  366. // data container
  367. const data: CollectionObject[] = [];
  368. // sort by created time
  369. allCollections.data.sort(
  370. (a, b) => Number(b.timestamp) - Number(a.timestamp)
  371. );
  372. // get target collections details
  373. const targetCollections = allCollections.data.filter(
  374. d => collectionName.indexOf(d.name) !== -1
  375. );
  376. const targets =
  377. targetCollections.length > 0 ? targetCollections : allCollections.data;
  378. // get all collection details
  379. for (let i = 0; i < targets.length; i++) {
  380. const collection = targets[i];
  381. const loadedCollection = loadedCollections.data.find(
  382. v => v.name === collection.name
  383. );
  384. const notLazy = !!loadedCollection || i < 5; // lazy is true, only load full details for the first 10 collections
  385. data.push(
  386. await this.getCollection(
  387. clientId,
  388. collection,
  389. loadedCollection,
  390. !notLazy
  391. )
  392. );
  393. }
  394. // start the queue
  395. if (cache.collectionsQueue.size() > 0) {
  396. cache.collectionsQueue.executeNext(async (collectionsToGet, q) => {
  397. // if the queue is obseleted, return
  398. if (q.isObseleted) {
  399. return;
  400. }
  401. try {
  402. // get current socket
  403. const socketClient = clients.get(clientId);
  404. // get collections
  405. const res = await this.getAllCollections(clientId, collectionsToGet);
  406. // emit event to current client
  407. socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, res);
  408. } catch (e) {
  409. console.log('ignore queue error');
  410. }
  411. }, 5);
  412. }
  413. // return data
  414. return data;
  415. }
  416. async getLoadedCollections(clientId: string) {
  417. const data = [];
  418. const res = await this.showCollections(clientId, {
  419. type: ShowCollectionsType.Loaded,
  420. });
  421. if (res.data.length > 0) {
  422. for (const item of res.data) {
  423. const { id, name } = item;
  424. const count = this.count(clientId, { collection_name: name });
  425. data.push({
  426. id,
  427. collection_name: name,
  428. rowCount: count,
  429. ...item,
  430. });
  431. }
  432. }
  433. return data;
  434. }
  435. /**
  436. * Get collections statistics data
  437. * @returns {collectionCount:number, totalData:number}
  438. */
  439. async getStatistics(clientId: string) {
  440. const data = {
  441. collectionCount: 0,
  442. totalData: 0,
  443. } as StatisticsObject;
  444. const res = await this.showCollections(clientId);
  445. data.collectionCount = res.data.length;
  446. if (res.data.length > 0) {
  447. for (const item of res.data) {
  448. const collectionStatistics = await this.getCollectionStatistics(
  449. clientId,
  450. {
  451. collection_name: item.name,
  452. }
  453. );
  454. const rowCount = findKeyValue(collectionStatistics.stats, ROW_COUNT);
  455. data.totalData += isNaN(Number(rowCount)) ? 0 : Number(rowCount);
  456. }
  457. }
  458. return data;
  459. }
  460. /**
  461. * Load sample data into collection
  462. */
  463. async importSample(
  464. clientId: string,
  465. { collection_name, size, download, format }: ImportSampleDto
  466. ) {
  467. const collectionInfo = await this.describeCollection(clientId, {
  468. collection_name,
  469. });
  470. const fields_data = genRows(
  471. collectionInfo.schema.fields,
  472. parseInt(size, 10),
  473. collectionInfo.schema.enable_dynamic_field
  474. );
  475. if (download) {
  476. const parser = new Parser({});
  477. const sampleFile =
  478. format === 'csv'
  479. ? parser.parse(fields_data)
  480. : JSON.stringify(fields_data);
  481. // If download is true, return the generated data directly
  482. return { sampleFile };
  483. } else {
  484. // Otherwise, insert the data into the collection
  485. return await this.insert(clientId, { collection_name, fields_data });
  486. }
  487. }
  488. async getCompactionState(clientId: string, data: GetCompactionStateReq) {
  489. const { milvusClient } = clientCache.get(clientId);
  490. const res = await milvusClient.getCompactionState(data);
  491. throwErrorFromSDK(res.status);
  492. return res;
  493. }
  494. async getQuerySegmentInfo(clientId: string, data: GetQuerySegmentInfoReq) {
  495. const { milvusClient } = clientCache.get(clientId);
  496. const res = await milvusClient.getQuerySegmentInfo(data);
  497. throwErrorFromSDK(res.status);
  498. return res;
  499. }
  500. async getPersistentSegmentInfo(
  501. clientId: string,
  502. data: GePersistentSegmentInfoReq
  503. ) {
  504. const { milvusClient } = clientCache.get(clientId);
  505. const res = await milvusClient.getPersistentSegmentInfo(data);
  506. throwErrorFromSDK(res.status);
  507. return res;
  508. }
  509. async compact(clientId: string, data: CompactReq) {
  510. const { milvusClient } = clientCache.get(clientId);
  511. const res = await milvusClient.compact(data);
  512. throwErrorFromSDK(res.status);
  513. return res;
  514. }
  515. async hasCollection(clientId: string, data: HasCollectionReq) {
  516. const { milvusClient } = clientCache.get(clientId);
  517. const res = await milvusClient.hasCollection(data);
  518. throwErrorFromSDK(res.status);
  519. return res;
  520. }
  521. async duplicateCollection(clientId: string, data: RenameCollectionReq) {
  522. const collection = await this.describeCollection(clientId, {
  523. collection_name: data.collection_name,
  524. });
  525. const createCollectionParams: CreateCollectionReq = {
  526. collection_name: data.new_collection_name,
  527. fields: collection.schema.fields.map(convertFieldSchemaToFieldType),
  528. consistency_level: collection.consistency_level as any,
  529. enable_dynamic_field: !!collection.schema.enable_dynamic_field,
  530. };
  531. if (
  532. collection.schema.fields.some(f => f.is_partition_key) &&
  533. collection.num_partitions
  534. ) {
  535. createCollectionParams.num_partitions = Number(collection.num_partitions);
  536. }
  537. return await this.createCollection(clientId, createCollectionParams);
  538. }
  539. async emptyCollection(clientId: string, data: HasCollectionReq) {
  540. const { milvusClient } = clientCache.get(clientId);
  541. const pkField = await milvusClient.getPkFieldName(data);
  542. const pkType = await milvusClient.getPkFieldType(data);
  543. const res = await milvusClient.deleteEntities({
  544. collection_name: data.collection_name,
  545. filter:
  546. pkType === 'Int64' ? `${pkField} >= ${MIN_INT64}` : `${pkField} != ''`,
  547. });
  548. return res;
  549. }
  550. async createIndex(clientId: string, data: CreateIndexReq) {
  551. const { milvusClient, indexCache, database } = clientCache.get(clientId);
  552. const res = await milvusClient.createIndex(data);
  553. throwErrorFromSDK(res);
  554. const key = `${database}/${data.collection_name}`;
  555. // clear cache;
  556. indexCache.delete(key);
  557. // fetch new collections
  558. const newCollection = (await this.getAllCollections(clientId, [
  559. data.collection_name,
  560. ])) as CollectionFullObject[];
  561. throwErrorFromSDK(res);
  562. return newCollection[0];
  563. }
  564. /**
  565. * This function is used to describe an index in Milvus.
  566. * It first checks if the index description is cached, if so, it returns the cached value.
  567. * If not, it calls the Milvus SDK's describeIndex function to get the index description.
  568. * If the index is finished building, it caches the index description for future use.
  569. * If the index is not finished building, it deletes any cached value for this index.
  570. * @param data - The request data for describing an index. It contains the collection name.
  571. * @returns - The response from the Milvus SDK's describeIndex function or the cached index description.
  572. */
  573. async describeIndex(clientId: string, data: DescribeIndexReq) {
  574. const { milvusClient, indexCache, database } = clientCache.get(clientId);
  575. // Get the collection name from the request data
  576. const key = `${database}/${data.collection_name}`;
  577. // Try to get the index description from the cache
  578. const value = indexCache.get(key);
  579. // If the index description is in the cache, return it
  580. if (value) {
  581. return value as DescribeIndexRes;
  582. } else {
  583. // If the index description is not in the cache, call the Milvus SDK's describeIndex function
  584. const res = (await milvusClient.describeIndex(data)) as DescribeIndexRes;
  585. res.index_descriptions.map(index => {
  586. // get indexType
  587. index.indexType = (index.params.find(p => p.key === 'index_type')
  588. ?.value || '') as string;
  589. // get metricType
  590. const metricTypePair =
  591. index.params.filter(v => v.key === 'metric_type') || [];
  592. index.metricType = findKeyValue(
  593. metricTypePair,
  594. 'metric_type'
  595. ) as string;
  596. // get index parameter pairs
  597. const paramsJSONstring = findKeyValue(index.params, 'params'); // params is a json string
  598. const params =
  599. (paramsJSONstring &&
  600. getKeyValueListFromJsonString(paramsJSONstring as string)) ||
  601. [];
  602. index.indexParameterPairs = [...metricTypePair, ...params];
  603. });
  604. // Return the response from the Milvus SDK's describeIndex function
  605. return res;
  606. }
  607. }
  608. async dropIndex(clientId: string, data: DropIndexReq) {
  609. const { milvusClient, indexCache, database } = clientCache.get(clientId);
  610. const res = await milvusClient.dropIndex(data);
  611. throwErrorFromSDK(res);
  612. const key = `${database}/${data.collection_name}`;
  613. // clear cache;
  614. indexCache.delete(key);
  615. // fetch new collections
  616. const newCollection = (await this.getAllCollections(clientId, [
  617. data.collection_name,
  618. ])) as CollectionFullObject[];
  619. return newCollection[0];
  620. }
  621. async clearCache(clientId: string) {
  622. const { indexCache } = clientCache.get(clientId);
  623. return indexCache.clear();
  624. }
  625. }