123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- import { MilvusService } from '../milvus/milvus.service';
- import {
- CreateCollectionReq,
- DescribeCollectionReq,
- DropCollectionReq,
- GetCollectionStatisticsReq,
- GetIndexStateReq,
- InsertReq,
- LoadCollectionReq,
- ReleaseLoadCollectionReq,
- SearchReq,
- RenameCollectionReq,
- AlterAliasReq,
- CreateAliasReq,
- DropAliasReq,
- ShowCollectionsReq,
- ShowCollectionsType,
- DeleteEntitiesReq,
- GetCompactionStateReq,
- GetQuerySegmentInfoReq,
- GePersistentSegmentInfoReq,
- CompactReq,
- CountReq,
- } from '@zilliz/milvus2-sdk-node';
- import { Parser } from '@json2csv/plainjs';
- import { throwErrorFromSDK, findKeyValue, genRows, ROW_COUNT } from '../utils';
- import { QueryDto, ImportSampleDto, GetReplicasDto } from './dto';
- export class CollectionsService {
- constructor(private milvusService: MilvusService) {}
- async getCollections(data?: ShowCollectionsReq) {
- const res = await this.milvusService.client.showCollections(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async createCollection(data: CreateCollectionReq) {
- const res = await this.milvusService.client.createCollection(data);
- throwErrorFromSDK(res);
- return res;
- }
- async describeCollection(data: DescribeCollectionReq) {
- const res = await this.milvusService.client.describeCollection(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async renameCollection(data: RenameCollectionReq) {
- const res = await this.milvusService.client.renameCollection(data);
- throwErrorFromSDK(res);
- return res;
- }
- async dropCollection(data: DropCollectionReq) {
- const res = await this.milvusService.client.dropCollection(data);
- throwErrorFromSDK(res);
- return res;
- }
- async loadCollection(data: LoadCollectionReq) {
- const res = await this.milvusService.client.loadCollection(data);
- throwErrorFromSDK(res);
- return res;
- }
- async releaseCollection(data: ReleaseLoadCollectionReq) {
- const res = await this.milvusService.client.releaseCollection(data);
- throwErrorFromSDK(res);
- return res;
- }
- async getCollectionStatistics(data: GetCollectionStatisticsReq) {
- const res = await this.milvusService.client.getCollectionStatistics(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async count(data: CountReq) {
- let count = 0;
- try {
- const countRes = await this.milvusService.client.count(data);
- count = countRes.data;
- } catch (error) {
- const collectionStatisticsRes = await this.getCollectionStatistics(data);
- count = collectionStatisticsRes.data.row_count;
- }
- return count;
- }
- async insert(data: InsertReq) {
- const res = await this.milvusService.client.insert(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async deleteEntities(data: DeleteEntitiesReq) {
- const res = await this.milvusService.client.deleteEntities(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async vectorSearch(data: SearchReq) {
- const now = Date.now();
- const res = await this.milvusService.client.search(data);
- const after = Date.now();
- throwErrorFromSDK(res.status);
- Object.assign(res, { latency: after - now });
- return res;
- }
- async createAlias(data: CreateAliasReq) {
- const res = await this.milvusService.client.createAlias(data);
- throwErrorFromSDK(res);
- return res;
- }
- async alterAlias(data: AlterAliasReq) {
- const res = await this.milvusService.client.alterAlias(data);
- throwErrorFromSDK(res);
- return res;
- }
- async dropAlias(data: DropAliasReq) {
- const res = await this.milvusService.client.dropAlias(data);
- throwErrorFromSDK(res);
- return res;
- }
- async getReplicas(data: GetReplicasDto) {
- const res = await this.milvusService.client.getReplicas(data);
- return res;
- }
- async query(
- data: {
- collection_name: string;
- } & QueryDto
- ) {
- const now = Date.now();
- const res = await this.milvusService.client.query(data);
- const after = Date.now();
- throwErrorFromSDK(res.status);
- Object.assign(res, { latency: after - now });
- return res;
- }
- /**
- * We do not throw error for this.
- * Because if collection dont have index, it will throw error.
- * We need wait for milvus error code.
- * @param data
- * @returns
- */
- async getIndexInfo(data: GetIndexStateReq) {
- const res = await this.milvusService.client.describeIndex(data);
- return res;
- }
- /**
- * Get all collections meta data
- * @returns {id:string, collection_name:string, schema:Field[], autoID:boolean, rowCount: string, consistency_level:string}
- */
- async getAllCollections() {
- const data: any = [];
- const res = await this.getCollections();
- const loadedCollections = await this.getCollections({
- type: ShowCollectionsType.Loaded,
- });
- if (res.data.length > 0) {
- for (const item of res.data) {
- const { name } = item;
- const collectionInfo = await this.describeCollection({
- collection_name: name,
- });
- let count: number | string;
- const collectionStatisticsRes = await this.getCollectionStatistics({
- collection_name: name,
- });
- count = collectionStatisticsRes.data.row_count;
- // try {
- // const countRes = await this.count({
- // collection_name: name,
- // });
- // count = countRes.data;
- // } catch (error) {
- // }
- const indexRes = await this.getIndexInfo({
- collection_name: item.name,
- });
- const autoID = collectionInfo.schema.fields.find(
- v => v.is_primary_key === true
- )?.autoID;
- const loadCollection = loadedCollections.data.find(
- v => v.name === name
- );
- const loadedPercentage = !loadCollection
- ? '-1'
- : loadCollection.loadedPercentage;
- let replicas;
- try {
- replicas = loadCollection
- ? await this.getReplicas({
- collectionID: collectionInfo.collectionID,
- })
- : replicas;
- } catch (e) {
- console.log('ignore getReplica');
- }
- data.push({
- aliases: collectionInfo.aliases,
- collection_name: name,
- schema: collectionInfo.schema,
- description: collectionInfo.schema.description,
- autoID,
- rowCount: count,
- id: collectionInfo.collectionID,
- loadedPercentage,
- createdTime: parseInt(collectionInfo.created_utc_timestamp, 10),
- index_descriptions: indexRes,
- consistency_level: collectionInfo.consistency_level,
- replicas: replicas && replicas.replicas,
- });
- }
- }
- // add default sort - Descending order
- data.sort((a: any, b: any) => b.createdTime - a.createdTime);
- return data;
- }
- async getLoadedCollections() {
- const data = [];
- const res = await this.getCollections({
- type: ShowCollectionsType.Loaded,
- });
- if (res.data.length > 0) {
- for (const item of res.data) {
- const { id, name } = item;
- const count = this.count({ collection_name: name });
- data.push({
- id,
- collection_name: name,
- rowCount: count,
- ...item,
- });
- }
- }
- return data;
- }
- /**
- * Get collections statistics data
- * @returns {collectionCount:number, totalData:number}
- */
- async getStatistics() {
- const data = {
- collectionCount: 0,
- totalData: 0,
- };
- const res = await this.getCollections();
- data.collectionCount = res.data.length;
- if (res.data.length > 0) {
- for (const item of res.data) {
- const collectionStatistics = await this.getCollectionStatistics({
- collection_name: item.name,
- });
- const rowCount = findKeyValue(collectionStatistics.stats, ROW_COUNT);
- data.totalData += isNaN(Number(rowCount)) ? 0 : Number(rowCount);
- }
- }
- return data;
- }
- /**
- * Get all collection index status
- * @returns {collection_name:string, index_descriptions: index_descriptions}[]
- */
- async getCollectionsIndexStatus() {
- const data = [];
- const res = await this.getCollections();
- if (res.data.length > 0) {
- for (const item of res.data) {
- const indexRes = await this.getIndexInfo({
- collection_name: item.name,
- });
- data.push({
- collection_name: item.name,
- index_descriptions: indexRes,
- });
- }
- }
- return data;
- }
- /**
- * Load sample data into collection
- */
- async importSample({
- collection_name,
- size,
- download,
- format,
- }: ImportSampleDto) {
- const collectionInfo = await this.describeCollection({ collection_name });
- const fields_data = genRows(
- collectionInfo.schema.fields,
- parseInt(size, 10),
- collectionInfo.schema.enable_dynamic_field
- );
- if (download) {
- const parser = new Parser({});
- const sampleFile =
- format === 'csv'
- ? parser.parse(fields_data)
- : JSON.stringify(fields_data);
- // If download is true, return the generated data directly
- return { sampleFile };
- } else {
- // Otherwise, insert the data into the collection
- return await this.insert({ collection_name, fields_data });
- }
- }
- async getCompactionState(data: GetCompactionStateReq) {
- const res = await this.milvusService.client.getCompactionState(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async getQuerySegmentInfo(data: GetQuerySegmentInfoReq) {
- const res = await this.milvusService.client.getQuerySegmentInfo(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async getPersistentSegmentInfo(data: GePersistentSegmentInfoReq) {
- const res = await this.milvusService.client.getPersistentSegmentInfo(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- async compact(data: CompactReq) {
- const res = await this.milvusService.client.compact(data);
- throwErrorFromSDK(res.status);
- return res;
- }
- }
|