|
@@ -22,16 +22,22 @@ import {
|
|
|
CountReq,
|
|
|
GetLoadStateReq,
|
|
|
CollectionData,
|
|
|
+ CreateIndexReq,
|
|
|
+ DescribeIndexReq,
|
|
|
+ DropIndexReq,
|
|
|
} from '@zilliz/milvus2-sdk-node';
|
|
|
import { Parser } from '@json2csv/plainjs';
|
|
|
import {
|
|
|
throwErrorFromSDK,
|
|
|
findKeyValue,
|
|
|
+ getKeyValueListFromJsonString,
|
|
|
genRows,
|
|
|
ROW_COUNT,
|
|
|
convertFieldSchemaToFieldType,
|
|
|
LOADING_STATE,
|
|
|
DYNAMIC_FIELD,
|
|
|
+ SimpleQueue,
|
|
|
+ MIN_INT64,
|
|
|
} from '../utils';
|
|
|
import { QueryDto, ImportSampleDto, GetReplicasDto } from './dto';
|
|
|
import {
|
|
@@ -42,18 +48,14 @@ import {
|
|
|
DescribeCollectionRes,
|
|
|
CountObject,
|
|
|
StatisticsObject,
|
|
|
+ CollectionFullObject,
|
|
|
+ DescribeIndexRes,
|
|
|
} from '../types';
|
|
|
-import { SchemaService } from '../schema/schema.service';
|
|
|
import { clientCache } from '../app';
|
|
|
-import { MIN_INT64 } from '../utils/Const';
|
|
|
+import { clients } from '../socket';
|
|
|
+import { WS_EVENTS } from '../utils';
|
|
|
|
|
|
export class CollectionsService {
|
|
|
- private schemaService: SchemaService;
|
|
|
-
|
|
|
- constructor() {
|
|
|
- this.schemaService = new SchemaService();
|
|
|
- }
|
|
|
-
|
|
|
async showCollections(clientId: string, data?: ShowCollectionsReq) {
|
|
|
const { milvusClient } = clientCache.get(clientId);
|
|
|
const res = await milvusClient.showCollections(data);
|
|
@@ -64,8 +66,12 @@ export class CollectionsService {
|
|
|
async createCollection(clientId: string, data: CreateCollectionReq) {
|
|
|
const { milvusClient } = clientCache.get(clientId);
|
|
|
const res = await milvusClient.createCollection(data);
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ data.collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
throwErrorFromSDK(res);
|
|
|
- return res;
|
|
|
+ return newCollection[0];
|
|
|
}
|
|
|
|
|
|
async describeCollection(clientId: string, data: DescribeCollectionReq) {
|
|
@@ -75,7 +81,7 @@ export class CollectionsService {
|
|
|
)) as DescribeCollectionRes;
|
|
|
|
|
|
// get index info for collections
|
|
|
- const indexRes = await this.schemaService.describeIndex(clientId, {
|
|
|
+ const indexRes = await this.describeIndex(clientId, {
|
|
|
collection_name: data.collection_name,
|
|
|
});
|
|
|
|
|
@@ -147,7 +153,12 @@ export class CollectionsService {
|
|
|
const { milvusClient } = clientCache.get(clientId);
|
|
|
const res = await milvusClient.renameCollection(data);
|
|
|
throwErrorFromSDK(res);
|
|
|
- return res;
|
|
|
+
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ data.new_collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
+ return newCollection[0];
|
|
|
}
|
|
|
|
|
|
async dropCollection(clientId: string, data: DropCollectionReq) {
|
|
@@ -161,14 +172,24 @@ export class CollectionsService {
|
|
|
const { milvusClient } = clientCache.get(clientId);
|
|
|
const res = await milvusClient.loadCollection(data);
|
|
|
throwErrorFromSDK(res);
|
|
|
- return res;
|
|
|
+
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ data.collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
+ return newCollection[0];
|
|
|
}
|
|
|
|
|
|
async releaseCollection(clientId: string, data: ReleaseLoadCollectionReq) {
|
|
|
const { milvusClient } = clientCache.get(clientId);
|
|
|
const res = await milvusClient.releaseCollection(data);
|
|
|
throwErrorFromSDK(res);
|
|
|
- return res;
|
|
|
+
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ data.collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
+ return newCollection[0];
|
|
|
}
|
|
|
|
|
|
async getCollectionStatistics(
|
|
@@ -233,7 +254,12 @@ export class CollectionsService {
|
|
|
const { milvusClient } = clientCache.get(clientId);
|
|
|
const res = await milvusClient.createAlias(data);
|
|
|
throwErrorFromSDK(res);
|
|
|
- return res;
|
|
|
+
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ data.collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
+ return newCollection[0];
|
|
|
}
|
|
|
|
|
|
async alterAlias(clientId: string, data: AlterAliasReq) {
|
|
@@ -243,11 +269,20 @@ export class CollectionsService {
|
|
|
return res;
|
|
|
}
|
|
|
|
|
|
- async dropAlias(clientId: string, data: DropAliasReq) {
|
|
|
+ async dropAlias(
|
|
|
+ clientId: string,
|
|
|
+ collection_name: string,
|
|
|
+ data: DropAliasReq
|
|
|
+ ) {
|
|
|
const { milvusClient } = clientCache.get(clientId);
|
|
|
const res = await milvusClient.dropAlias(data);
|
|
|
throwErrorFromSDK(res);
|
|
|
- return res;
|
|
|
+
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
+ return newCollection[0];
|
|
|
}
|
|
|
|
|
|
async getReplicas(clientId: string, data: GetReplicasDto) {
|
|
@@ -280,7 +315,12 @@ export class CollectionsService {
|
|
|
loadCollection: CollectionData,
|
|
|
lazy: boolean = false
|
|
|
) {
|
|
|
+ const { collectionsQueue } = clientCache.get(clientId);
|
|
|
if (lazy) {
|
|
|
+ // add to lazy queue
|
|
|
+ collectionsQueue.enqueue(collection.name);
|
|
|
+
|
|
|
+ // return lazy object
|
|
|
return {
|
|
|
id: collection.id,
|
|
|
collection_name: collection.name,
|
|
@@ -327,13 +367,13 @@ export class CollectionsService {
|
|
|
|
|
|
// loading info
|
|
|
const loadedPercentage = !loadCollection
|
|
|
- ? '-1'
|
|
|
- : loadCollection.loadedPercentage;
|
|
|
+ ? -1
|
|
|
+ : Number(loadCollection.loadedPercentage);
|
|
|
|
|
|
const status =
|
|
|
- loadedPercentage === '-1'
|
|
|
+ loadedPercentage === -1
|
|
|
? LOADING_STATE.UNLOADED
|
|
|
- : loadedPercentage === '100'
|
|
|
+ : loadedPercentage === 100
|
|
|
? LOADING_STATE.LOADED
|
|
|
: LOADING_STATE.LOADING;
|
|
|
|
|
@@ -354,10 +394,19 @@ export class CollectionsService {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ // get all collections details
|
|
|
async getAllCollections(
|
|
|
clientId: string,
|
|
|
- collectionName?: string
|
|
|
+ collectionName: string[] = []
|
|
|
): Promise<CollectionObject[]> {
|
|
|
+ const cache = clientCache.get(clientId);
|
|
|
+
|
|
|
+ // clear collectionsQueue
|
|
|
+ if (collectionName.length === 0) {
|
|
|
+ cache.collectionsQueue.stop();
|
|
|
+ cache.collectionsQueue = new SimpleQueue<string>();
|
|
|
+ }
|
|
|
+
|
|
|
// get all collections(name, timestamp, id)
|
|
|
const allCollections = await this.showCollections(clientId);
|
|
|
// get all loaded collection
|
|
@@ -372,33 +421,55 @@ export class CollectionsService {
|
|
|
(a, b) => Number(b.timestamp) - Number(a.timestamp)
|
|
|
);
|
|
|
|
|
|
- // get single collection details
|
|
|
- const targetCollections = allCollections.data.find(
|
|
|
- d => d.name === collectionName
|
|
|
+ // get target collections details
|
|
|
+ const targetCollections = allCollections.data.filter(
|
|
|
+ d => collectionName.indexOf(d.name) !== -1
|
|
|
);
|
|
|
- if (targetCollections) {
|
|
|
- const res = await this.getCollection(
|
|
|
- clientId,
|
|
|
- targetCollections,
|
|
|
- loadedCollections.data.find(v => v.name === targetCollections.name),
|
|
|
- false
|
|
|
- );
|
|
|
- return [res];
|
|
|
- }
|
|
|
+
|
|
|
+ const targets =
|
|
|
+ targetCollections.length > 0 ? targetCollections : allCollections.data;
|
|
|
|
|
|
// get all collection details
|
|
|
- for (let i = 0; i < allCollections.data.length; i++) {
|
|
|
- const collection = allCollections.data[i];
|
|
|
+ for (let i = 0; i < targets.length; i++) {
|
|
|
+ const collection = targets[i];
|
|
|
+ const loadedCollection = loadedCollections.data.find(
|
|
|
+ v => v.name === collection.name
|
|
|
+ );
|
|
|
+
|
|
|
+ const notLazy = !!loadedCollection || i < 5; // lazy is true, only load full details for the first 10 collections
|
|
|
+
|
|
|
data.push(
|
|
|
await this.getCollection(
|
|
|
clientId,
|
|
|
collection,
|
|
|
- loadedCollections.data.find(v => v.name === collection.name),
|
|
|
- false
|
|
|
+ loadedCollection,
|
|
|
+ !notLazy
|
|
|
)
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ // start the queue
|
|
|
+ if (cache.collectionsQueue.size() > 0) {
|
|
|
+ cache.collectionsQueue.executeNext(async (collectionsToGet, q) => {
|
|
|
+ // if the queue is obseleted, return
|
|
|
+ if (q.isObseleted) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ // get current socket
|
|
|
+ const socketClient = clients.get(clientId);
|
|
|
+ // get collections
|
|
|
+ const res = await this.getAllCollections(clientId, collectionsToGet);
|
|
|
+
|
|
|
+ // emit event to current client
|
|
|
+ socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, res);
|
|
|
+ } catch (e) {
|
|
|
+ console.log('ignore queue error');
|
|
|
+ }
|
|
|
+ }, 5);
|
|
|
+ }
|
|
|
+
|
|
|
+ // return data
|
|
|
return data;
|
|
|
}
|
|
|
|
|
@@ -552,4 +623,93 @@ export class CollectionsService {
|
|
|
|
|
|
return res;
|
|
|
}
|
|
|
+
|
|
|
+ async createIndex(clientId: string, data: CreateIndexReq) {
|
|
|
+ const { milvusClient, indexCache, database } = clientCache.get(clientId);
|
|
|
+ const res = await milvusClient.createIndex(data);
|
|
|
+ throwErrorFromSDK(res);
|
|
|
+ const key = `${database}/${data.collection_name}`;
|
|
|
+ // clear cache;
|
|
|
+ indexCache.delete(key);
|
|
|
+
|
|
|
+ // fetch new collections
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ data.collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
+ throwErrorFromSDK(res);
|
|
|
+ return newCollection[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This function is used to describe an index in Milvus.
|
|
|
+ * It first checks if the index description is cached, if so, it returns the cached value.
|
|
|
+ * If not, it calls the Milvus SDK's describeIndex function to get the index description.
|
|
|
+ * If the index is finished building, it caches the index description for future use.
|
|
|
+ * If the index is not finished building, it deletes any cached value for this index.
|
|
|
+ * @param data - The request data for describing an index. It contains the collection name.
|
|
|
+ * @returns - The response from the Milvus SDK's describeIndex function or the cached index description.
|
|
|
+ */
|
|
|
+ async describeIndex(clientId: string, data: DescribeIndexReq) {
|
|
|
+ const { milvusClient, indexCache, database } = clientCache.get(clientId);
|
|
|
+
|
|
|
+ // Get the collection name from the request data
|
|
|
+ const key = `${database}/${data.collection_name}`;
|
|
|
+
|
|
|
+ // Try to get the index description from the cache
|
|
|
+ const value = indexCache.get(key);
|
|
|
+
|
|
|
+ // If the index description is in the cache, return it
|
|
|
+ if (value) {
|
|
|
+ return value as DescribeIndexRes;
|
|
|
+ } else {
|
|
|
+ // If the index description is not in the cache, call the Milvus SDK's describeIndex function
|
|
|
+ const res = (await milvusClient.describeIndex(data)) as DescribeIndexRes;
|
|
|
+
|
|
|
+ res.index_descriptions.map(index => {
|
|
|
+ // get indexType
|
|
|
+ index.indexType = (index.params.find(p => p.key === 'index_type')
|
|
|
+ ?.value || '') as string;
|
|
|
+ // get metricType
|
|
|
+ const metricTypePair =
|
|
|
+ index.params.filter(v => v.key === 'metric_type') || [];
|
|
|
+ index.metricType = findKeyValue(
|
|
|
+ metricTypePair,
|
|
|
+ 'metric_type'
|
|
|
+ ) as string;
|
|
|
+ // get index parameter pairs
|
|
|
+ const paramsJSONstring = findKeyValue(index.params, 'params'); // params is a json string
|
|
|
+ const params =
|
|
|
+ (paramsJSONstring &&
|
|
|
+ getKeyValueListFromJsonString(paramsJSONstring as string)) ||
|
|
|
+ [];
|
|
|
+ index.indexParameterPairs = [...metricTypePair, ...params];
|
|
|
+ });
|
|
|
+
|
|
|
+ // Return the response from the Milvus SDK's describeIndex function
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async dropIndex(clientId: string, data: DropIndexReq) {
|
|
|
+ const { milvusClient, indexCache, database } = clientCache.get(clientId);
|
|
|
+ const res = await milvusClient.dropIndex(data);
|
|
|
+ throwErrorFromSDK(res);
|
|
|
+
|
|
|
+ const key = `${database}/${data.collection_name}`;
|
|
|
+
|
|
|
+ // clear cache;
|
|
|
+ indexCache.delete(key);
|
|
|
+ // fetch new collections
|
|
|
+ const newCollection = (await this.getAllCollections(clientId, [
|
|
|
+ data.collection_name,
|
|
|
+ ])) as CollectionFullObject[];
|
|
|
+
|
|
|
+ return newCollection[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ async clearCache(clientId: string) {
|
|
|
+ const { indexCache } = clientCache.get(clientId);
|
|
|
+ return indexCache.clear();
|
|
|
+ }
|
|
|
}
|