123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804 |
- import {
- CreateCollectionReq,
- DescribeCollectionReq,
- DropCollectionReq,
- GetCollectionStatisticsReq,
- InsertReq,
- LoadCollectionReq,
- ReleaseLoadCollectionReq,
- RenameCollectionReq,
- AlterAliasReq,
- CreateAliasReq,
- ShowCollectionsReq,
- ShowCollectionsType,
- DeleteEntitiesReq,
- GetCompactionStateReq,
- GetQuerySegmentInfoReq,
- GePersistentSegmentInfoReq,
- CompactReq,
- HasCollectionReq,
- CountReq,
- GetLoadStateReq,
- CollectionData,
- CreateIndexReq,
- DescribeIndexReq,
- DropIndexReq,
- AlterCollectionReq,
- DataType,
- HybridSearchReq,
- SearchSimpleReq,
- LoadState,
- } from '@zilliz/milvus2-sdk-node';
- import { Parser } from '@json2csv/plainjs';
- import {
- findKeyValue,
- getKeyValueListFromJsonString,
- genRows,
- ROW_COUNT,
- convertFieldSchemaToFieldType,
- LOADING_STATE,
- DYNAMIC_FIELD,
- SimpleQueue,
- MIN_INT64,
- VectorTypes,
- cloneObj,
- } from '../utils';
- import { QueryDto, ImportSampleDto } from './dto';
- import {
- CollectionObject,
- CollectionLazyObject,
- FieldObject,
- IndexObject,
- DescribeCollectionRes,
- CountObject,
- StatisticsObject,
- CollectionFullObject,
- DescribeIndexRes,
- } from '../types';
- import { clientCache } from '../app';
- import { clients } from '../socket';
- import { WS_EVENTS } from '../utils';
- export class CollectionsService {
- async showCollections(clientId: string, data?: ShowCollectionsReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.showCollections(data);
- return res;
- }
- async createCollection(clientId: string, data: CreateCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.createCollection(data);
- const newCollection = (await this.getAllCollections(
- clientId,
- [data.collection_name],
- data.db_name
- )) as CollectionFullObject[];
- return newCollection[0];
- }
- async describeUnformattedCollection(
- clientId: string,
- collection_name: string,
- db_name?: string
- ) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.describeCollection({
- collection_name,
- db_name,
- });
- return res;
- }
- async describeCollection(clientId: string, data: DescribeCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = (await milvusClient.describeCollection(
- data
- )) as DescribeCollectionRes;
- // get index info for collections
- const indexRes = await this.describeIndex(clientId, {
- collection_name: data.collection_name,
- });
- const vectorFields: FieldObject[] = [];
- const scalarFields: FieldObject[] = [];
- const functionFields: FieldObject[] = [];
- // assign function to field
- const fieldMap = new Map(
- res.schema.fields.map(field => [field.name, field])
- );
- res.schema.functions.forEach(fn => {
- const assignFunction = (fieldName: string) => {
- const field = fieldMap.get(fieldName);
- if (field) {
- field.function = fn;
- }
- };
- fn.output_field_names.forEach(assignFunction);
- fn.input_field_names.forEach(assignFunction);
- });
- // get function input fields
- const inputFieldNames = res.schema.functions.reduce((acc, cur) => {
- return acc.concat(cur.input_field_names);
- }, []);
- // append index info to each field
- res.schema.fields.forEach((field: FieldObject) => {
- // add index
- field.index = indexRes.index_descriptions.find(
- index => index.field_name === field.name
- ) as IndexObject;
- // add dimension
- field.dimension = Number(field.dim) || -1;
- // add max capacity
- field.maxCapacity = Number(field.max_capacity) || -1;
- // add max length
- field.maxLength = Number(field.max_length) || -1;
- // classify fields
- if (VectorTypes.includes(field.data_type)) {
- vectorFields.push(field);
- } else {
- scalarFields.push(field);
- }
- if (field.is_primary_key) {
- res.schema.primaryField = field;
- }
- // add functionFields if field name included in inputFieldNames
- if (inputFieldNames.includes(field.name)) {
- functionFields.push(field);
- }
- });
- // add extra data to schema
- res.schema.hasVectorIndex = vectorFields.every(v => v.index);
- res.schema.enablePartitionKey = res.schema.fields.some(
- v => v.is_partition_key
- );
- res.schema.scalarFields = scalarFields;
- res.schema.vectorFields = vectorFields;
- res.schema.functionFields = functionFields;
- res.schema.dynamicFields = res.schema.enable_dynamic_field
- ? [
- {
- name: DYNAMIC_FIELD,
- data_type: 'JSON',
- type_params: [],
- index: undefined,
- description: '',
- index_params: [],
- dimension: -1,
- maxCapacity: -1,
- maxLength: -1,
- autoID: false,
- fieldID: '',
- state: '',
- dataType: DataType.JSON,
- is_function_output: false,
- is_primary_key: false,
- },
- ]
- : [];
- return res;
- }
- async renameCollection(clientId: string, data: RenameCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.renameCollection(data);
- const newCollection = (await this.getAllCollections(
- clientId,
- [data.new_collection_name],
- data.db_name
- )) as CollectionFullObject[];
- return newCollection[0];
- }
- async alterCollection(clientId: string, data: AlterCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.alterCollectionProperties(data);
- const newCollection = (await this.getAllCollections(
- clientId,
- [data.collection_name],
- data.db_name
- )) as CollectionFullObject[];
- return newCollection[0];
- }
- async dropCollection(clientId: string, data: DropCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.dropCollection(data);
- return res;
- }
- async loadCollection(clientId: string, data: LoadCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.loadCollection(data);
- return data.collection_name;
- }
- async loadCollectionAsync(clientId: string, data: LoadCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.loadCollectionAsync(data);
- return data.collection_name;
- }
- async releaseCollection(clientId: string, data: ReleaseLoadCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.releaseCollection(data);
- // emit update to client
- this.updateCollectionsDetails(
- clientId,
- [data.collection_name],
- data.db_name
- );
- return data.collection_name;
- }
- async getCollectionStatistics(
- clientId: string,
- data: GetCollectionStatisticsReq
- ) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.getCollectionStatistics(data);
- return res;
- }
- async getLoadState(clientId: string, data: GetLoadStateReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.getLoadState(data);
- return res;
- }
- async count(clientId: string, data: CountReq) {
- const { milvusClient } = clientCache.get(clientId);
- let count = 0;
- try {
- // check if the collection is loaded
- const loadStateRes = await milvusClient.getLoadState(data);
- if (loadStateRes.state === LoadState.LoadStateLoaded) {
- const countRes = await milvusClient.count(data);
- count = countRes.data;
- } else {
- const collectionStatisticsRes = await this.getCollectionStatistics(
- clientId,
- data
- );
- count = collectionStatisticsRes.data.row_count;
- }
- } catch (error) {
- console.log('ignore count error');
- }
- return { rowCount: Number(count) } as CountObject;
- }
- async insert(clientId: string, data: InsertReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.insert(data);
- return res;
- }
- async upsert(clientId: string, data: InsertReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.upsert(data);
- return res;
- }
- async deleteEntities(clientId: string, data: DeleteEntitiesReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.deleteEntities(data);
- return res;
- }
- async vectorSearch(
- clientId: string,
- data: HybridSearchReq | SearchSimpleReq
- ) {
- const { milvusClient } = clientCache.get(clientId);
- const now = Date.now();
- const searchParams = data as HybridSearchReq;
- const isHybrid =
- Array.isArray(searchParams.data) && searchParams.data.length > 1;
- const singleSearchParams = cloneObj(data) as SearchSimpleReq;
- // for 2.3.x milvus
- if (searchParams.data && searchParams.data.length === 1) {
- delete singleSearchParams.data;
- delete singleSearchParams.params;
- if (Object.keys(searchParams.data[0].params).length > 0) {
- singleSearchParams.params = searchParams.data[0].params;
- }
- singleSearchParams.data = searchParams.data[0].data;
- singleSearchParams.anns_field = searchParams.data[0].anns_field;
- singleSearchParams.group_by_field = searchParams.group_by_field;
- }
- const res = await milvusClient.search(
- isHybrid ? searchParams : singleSearchParams
- );
- const after = Date.now();
- Object.assign(res, { latency: after - now });
- return res;
- }
- async createAlias(clientId: string, data: CreateAliasReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.createAlias(data);
- const newCollection = (await this.getAllCollections(
- clientId,
- [data.collection_name],
- data.db_name
- )) as CollectionFullObject[];
- return newCollection[0];
- }
- async alterAlias(clientId: string, data: AlterAliasReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.alterAlias(data);
- return res;
- }
- async dropAlias(clientId: string, collection_name: string, data: any) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.dropAlias(data);
- const newCollection = (await this.getAllCollections(
- clientId,
- [collection_name],
- data.db_name
- )) as CollectionFullObject[];
- return newCollection[0];
- }
- async getReplicas(clientId: string, data: any) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.getReplicas(data);
- return res;
- }
- async query(
- clientId: string,
- data: {
- collection_name: string;
- } & QueryDto
- ) {
- const { milvusClient } = clientCache.get(clientId);
- const now = Date.now();
- const res = await milvusClient.query(data);
- const after = Date.now();
- Object.assign(res, { latency: after - now });
- return res;
- }
- // get single collection details
- async getCollection(
- clientId: string,
- collection: CollectionData,
- loadCollection: CollectionData,
- lazy: boolean = false,
- database?: string
- ) {
- const { collectionsQueue } = clientCache.get(clientId);
- if (lazy) {
- // add to lazy queue
- collectionsQueue.enqueue(collection.name);
- // return lazy object
- return {
- id: collection.id,
- collection_name: collection.name,
- createdTime: Number(collection.timestamp),
- schema: undefined,
- rowCount: undefined,
- aliases: undefined,
- description: undefined,
- autoID: undefined,
- loadedPercentage: undefined,
- consistency_level: undefined,
- replicas: undefined,
- loaded: undefined,
- } as CollectionLazyObject;
- }
- // get collection schema and properties
- const collectionInfo = await this.describeCollection(clientId, {
- collection_name: collection.name,
- db_name: database,
- });
- // get collection statistic data
- let count: number;
- try {
- const res = await this.count(clientId, {
- collection_name: collection.name,
- db_name: database,
- });
- count = res.rowCount;
- } catch (e) {
- console.log('ignore getCollectionStatistics');
- }
- // extract autoID
- const autoID = collectionInfo.schema.fields.find(
- v => v.is_primary_key === true
- )?.autoID;
- // get replica info
- let replicas;
- try {
- replicas = loadCollection
- ? await this.getReplicas(clientId, {
- collectionID: collectionInfo.collectionID,
- db_name: database,
- })
- : replicas;
- } catch (e) {
- console.log('ignore getReplica');
- }
- // loading info
- const loadedPercentage = !loadCollection
- ? -1
- : Number(loadCollection.loadedPercentage);
- const status =
- loadedPercentage === -1
- ? LOADING_STATE.UNLOADED
- : loadedPercentage === 100
- ? LOADING_STATE.LOADED
- : LOADING_STATE.LOADING;
- return {
- collection_name: collection.name,
- schema: collectionInfo.schema,
- rowCount: Number(count || 0),
- createdTime: parseInt(collectionInfo.created_utc_timestamp, 10),
- aliases: collectionInfo.aliases,
- description: collectionInfo.schema.description,
- autoID,
- id: collectionInfo.collectionID,
- loadedPercentage,
- consistency_level: collectionInfo.consistency_level,
- replicas: (replicas && replicas.replicas) || [],
- loaded: status === LOADING_STATE.LOADED,
- status,
- properties: collectionInfo.properties,
- };
- }
- // get all collections details
- async getAllCollections(
- clientId: string,
- collections: string[] = [],
- database?: string
- ): Promise<CollectionObject[]> {
- const currentClient = clientCache.get(clientId);
- // clear collectionsQueue if we fetch all collections
- if (collections.length === 0) {
- currentClient.collectionsQueue.stop();
- currentClient.collectionsQueue = new SimpleQueue<string>();
- }
- // get all collections(name, timestamp, id)
- const allCollections = await this.showCollections(clientId, {
- db_name: database,
- });
- // get all loaded collection
- const loadedCollections = await this.showCollections(clientId, {
- type: ShowCollectionsType.Loaded,
- db_name: database,
- });
- // data container
- const data: CollectionObject[] = [];
- // get target collections details
- const targetCollections = allCollections.data.filter(
- d => collections.indexOf(d.name) !== -1
- );
- const targets =
- targetCollections.length > 0 ? targetCollections : allCollections.data;
- // sort targets by name
- targets.sort((a, b) => a.name.localeCompare(b.name));
- // get all collection details
- for (let i = 0; i < targets.length; i++) {
- const collection = targets[i];
- const loadedCollection = loadedCollections.data.find(
- v => v.name === collection.name
- );
- const notLazy = i <= 5; // lazy is true, only load full details for the first 10 collections
- data.push(
- await this.getCollection(
- clientId,
- collection,
- loadedCollection,
- !notLazy,
- database
- )
- );
- }
- // start the queue
- if (currentClient.collectionsQueue.size() > 0) {
- currentClient.collectionsQueue.executeNext(
- async (collectionsToGet, q) => {
- // if the queue is obseleted, return
- if (q.isObseleted) {
- return;
- }
- await this.updateCollectionsDetails(
- clientId,
- collectionsToGet,
- database
- );
- },
- 5
- );
- }
- // return data
- return data;
- }
- // update collections details
- // send new info to the client
- async updateCollectionsDetails(
- clientId: string,
- collections: string[],
- database: string
- ) {
- try {
- // get current socket
- const socketClient = clients.get(clientId);
- // get collections
- const res = await this.getAllCollections(clientId, collections, database);
- // emit event to current client
- if (socketClient) {
- socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, { collections: res });
- }
- } catch (e) {
- console.log('ignore queue error');
- }
- }
- async getLoadedCollections(clientId: string, db_name?: string) {
- const data = [];
- const res = await this.showCollections(clientId, {
- type: ShowCollectionsType.Loaded,
- db_name,
- });
- if (res.data.length > 0) {
- for (const item of res.data) {
- const { id, name } = item;
- const count = this.count(clientId, { collection_name: name, db_name });
- data.push({
- id,
- collection_name: name,
- rowCount: count,
- ...item,
- });
- }
- }
- return data;
- }
- /**
- * Get collections statistics data
- * @returns {collectionCount:number, totalData:number}
- */
- async getStatistics(clientId: string, db_name?: string) {
- const data = {
- collectionCount: 0,
- totalData: 0,
- } as StatisticsObject;
- const res = await this.showCollections(clientId, { db_name });
- data.collectionCount = res.data.length;
- if (res.data.length > 0) {
- for (const item of res.data) {
- const collectionStatistics = await this.getCollectionStatistics(
- clientId,
- {
- collection_name: item.name,
- db_name,
- }
- );
- const rowCount = findKeyValue(collectionStatistics.stats, ROW_COUNT);
- data.totalData += isNaN(Number(rowCount)) ? 0 : Number(rowCount);
- }
- }
- return data;
- }
- /**
- * Load sample data into collection
- */
- async importSample(
- clientId: string,
- { collection_name, size, download, format, db_name }: ImportSampleDto
- ) {
- const collectionInfo = await this.describeCollection(clientId, {
- collection_name,
- db_name,
- });
- const fields_data = genRows(
- collectionInfo.schema.fields,
- parseInt(size, 10),
- collectionInfo.schema.enable_dynamic_field
- );
- if (download) {
- const parser = new Parser({});
- const sampleFile =
- format === 'csv'
- ? parser.parse(fields_data)
- : JSON.stringify(fields_data);
- // If download is true, return the generated data directly
- return { sampleFile };
- } else {
- // Otherwise, insert the data into the collection
- return await this.insert(clientId, {
- collection_name,
- fields_data,
- db_name,
- });
- }
- }
- async getCompactionState(clientId: string, data: GetCompactionStateReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.getCompactionState(data);
- return res;
- }
- async getQuerySegmentInfo(clientId: string, data: GetQuerySegmentInfoReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.getQuerySegmentInfo(data);
- return res;
- }
- async getPersistentSegmentInfo(
- clientId: string,
- data: GePersistentSegmentInfoReq
- ) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.getPersistentSegmentInfo(data);
- return res;
- }
- async compact(clientId: string, data: CompactReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.compact(data);
- return res;
- }
- async hasCollection(clientId: string, data: HasCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const res = await milvusClient.hasCollection(data);
- return res;
- }
- async duplicateCollection(clientId: string, data: RenameCollectionReq) {
- const collection = await this.describeCollection(clientId, {
- collection_name: data.collection_name,
- db_name: data.db_name,
- });
- const createCollectionParams: CreateCollectionReq = {
- collection_name: data.new_collection_name,
- fields: collection.schema.fields.map(convertFieldSchemaToFieldType),
- consistency_level: collection.consistency_level as any,
- enable_dynamic_field: !!collection.schema.enable_dynamic_field,
- };
- if (
- collection.schema.fields.some(f => f.is_partition_key) &&
- collection.num_partitions
- ) {
- createCollectionParams.num_partitions = Number(collection.num_partitions);
- }
- return await this.createCollection(clientId, createCollectionParams);
- }
- async emptyCollection(clientId: string, data: HasCollectionReq) {
- const { milvusClient } = clientCache.get(clientId);
- const pkField = await milvusClient.getPkFieldName(data);
- const pkType = await milvusClient.getPkFieldType(data);
- const res = await milvusClient.deleteEntities({
- collection_name: data.collection_name,
- filter:
- pkType === 'Int64' ? `${pkField} >= ${MIN_INT64}` : `${pkField} != ''`,
- db_name: data.db_name,
- });
- return res;
- }
- async createIndex(clientId: string, data: CreateIndexReq) {
- const { milvusClient } = clientCache.get(clientId);
- await milvusClient.createIndex(data);
- // fetch new collections
- const newCollection = (await this.getAllCollections(
- clientId,
- [data.collection_name],
- data.db_name
- )) as CollectionFullObject[];
- return newCollection[0];
- }
- async describeIndex(clientId: string, data: DescribeIndexReq) {
- const { milvusClient } = clientCache.get(clientId);
- // If the index description is not in the cache, call the Milvus SDK's describeIndex function
- const res = (await milvusClient.describeIndex(data)) as DescribeIndexRes;
- res.index_descriptions.map(index => {
- // get indexType
- index.indexType = (index.params.find(p => p.key === 'index_type')
- ?.value || '') as string;
- // get metricType
- const metricTypePair =
- index.params.filter(v => v.key === 'metric_type') || [];
- index.metricType = findKeyValue(metricTypePair, 'metric_type') as string;
- // get index parameter pairs
- const paramsJSONstring = findKeyValue(index.params, 'params'); // params is a json string
- const params =
- (paramsJSONstring &&
- getKeyValueListFromJsonString(paramsJSONstring as string)) ||
- [];
- index.indexParameterPairs = [...metricTypePair, ...params];
- });
- // Return the response from the Milvus SDK's describeIndex function
- return res;
- }
- async dropIndex(clientId: string, data: DropIndexReq) {
- const { milvusClient, database } = clientCache.get(clientId);
- await milvusClient.dropIndex(data);
- // fetch new collections
- const newCollection = (await this.getAllCollections(
- clientId,
- [data.collection_name],
- data.db_name
- )) as CollectionFullObject[];
- return newCollection[0];
- }
- }
|