|
@@ -3,87 +3,87 @@ import {
|
|
FlushReq,
|
|
FlushReq,
|
|
GetMetricsResponse,
|
|
GetMetricsResponse,
|
|
} from '@zilliz/milvus2-sdk-node/dist/milvus/types';
|
|
} from '@zilliz/milvus2-sdk-node/dist/milvus/types';
|
|
-
|
|
|
|
|
|
+import HttpErrors from 'http-errors';
|
|
|
|
+import LruCache from 'lru-cache';
|
|
|
|
+import { HTTP_STATUS_CODE } from '../utils/Error';
|
|
export class MilvusService {
|
|
export class MilvusService {
|
|
// Share with all instances, so activeAddress is static
|
|
// Share with all instances, so activeAddress is static
|
|
static activeAddress: string;
|
|
static activeAddress: string;
|
|
- private milvusClients: { [x: string]: MilvusClient };
|
|
|
|
-
|
|
|
|
- constructor() {
|
|
|
|
- this.milvusClients = {};
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- get activeMilvusClient() {
|
|
|
|
- // undefined means not connect yet, will throw error to client.
|
|
|
|
- return this.milvusClients[MilvusService.activeAddress];
|
|
|
|
- }
|
|
|
|
|
|
+ static activeMilvusClient: MilvusClient;
|
|
|
|
|
|
get collectionManager() {
|
|
get collectionManager() {
|
|
this.checkMilvus();
|
|
this.checkMilvus();
|
|
- return this.activeMilvusClient.collectionManager;
|
|
|
|
|
|
+ return MilvusService.activeMilvusClient.collectionManager;
|
|
}
|
|
}
|
|
|
|
|
|
get partitionManager() {
|
|
get partitionManager() {
|
|
this.checkMilvus();
|
|
this.checkMilvus();
|
|
- return this.activeMilvusClient.partitionManager;
|
|
|
|
|
|
+ return MilvusService.activeMilvusClient.partitionManager;
|
|
}
|
|
}
|
|
|
|
|
|
get indexManager() {
|
|
get indexManager() {
|
|
this.checkMilvus();
|
|
this.checkMilvus();
|
|
- return this.activeMilvusClient.indexManager;
|
|
|
|
|
|
+ return MilvusService.activeMilvusClient.indexManager;
|
|
}
|
|
}
|
|
|
|
|
|
get dataManager() {
|
|
get dataManager() {
|
|
this.checkMilvus();
|
|
this.checkMilvus();
|
|
- return this.activeMilvusClient.dataManager;
|
|
|
|
|
|
+ return MilvusService.activeMilvusClient.dataManager;
|
|
}
|
|
}
|
|
|
|
|
|
- private checkMilvus() {
|
|
|
|
- if (!this.activeMilvusClient) {
|
|
|
|
- throw new Error('Please connect milvus first');
|
|
|
|
- }
|
|
|
|
|
|
+ static formatAddress(address: string) {
|
|
|
|
+ return address.replace(/(http|https):\/\//, '');
|
|
}
|
|
}
|
|
|
|
|
|
- formatAddress(address: string) {
|
|
|
|
- return address.replace(/(http|https):\/\//, '');
|
|
|
|
|
|
+ checkMilvus() {
|
|
|
|
+ if (!MilvusService.activeMilvusClient) {
|
|
|
|
+ throw HttpErrors(
|
|
|
|
+ HTTP_STATUS_CODE.UNAUTHORIZED,
|
|
|
|
+ 'Please connect milvus first'
|
|
|
|
+ );
|
|
|
|
+ // throw new Error('Please connect milvus first');
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- async connectMilvus(address: string) {
|
|
|
|
|
|
+ async connectMilvus(address: string, cache: LruCache<any, any>) {
|
|
// grpc only need address without http
|
|
// grpc only need address without http
|
|
- const milvusAddress = this.formatAddress(address);
|
|
|
|
|
|
+ const milvusAddress = MilvusService.formatAddress(address);
|
|
try {
|
|
try {
|
|
const milvusClient = new MilvusClient(milvusAddress);
|
|
const milvusClient = new MilvusClient(milvusAddress);
|
|
await milvusClient.collectionManager.hasCollection({
|
|
await milvusClient.collectionManager.hasCollection({
|
|
collection_name: 'not_exist',
|
|
collection_name: 'not_exist',
|
|
});
|
|
});
|
|
MilvusService.activeAddress = address;
|
|
MilvusService.activeAddress = address;
|
|
- this.milvusClients[milvusAddress] = milvusClient;
|
|
|
|
|
|
+ cache.set(milvusAddress, milvusClient);
|
|
return { address };
|
|
return { address };
|
|
} catch (error) {
|
|
} catch (error) {
|
|
// if milvus is not working, delete connection.
|
|
// if milvus is not working, delete connection.
|
|
- delete this.milvusClients[milvusAddress];
|
|
|
|
- throw new Error('Connect milvus failed, check your milvus address.');
|
|
|
|
|
|
+ cache.del(milvusAddress);
|
|
|
|
+ throw HttpErrors(
|
|
|
|
+ HTTP_STATUS_CODE.BAD_REQUEST,
|
|
|
|
+ 'Connect milvus failed, check your milvus address.'
|
|
|
|
+ );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- async checkConnect(address: string) {
|
|
|
|
- const milvusAddress = this.formatAddress(address);
|
|
|
|
- if (!Object.keys(this.milvusClients).includes(milvusAddress)) {
|
|
|
|
|
|
+ async checkConnect(address: string, cache: LruCache<any, any>) {
|
|
|
|
+ const milvusAddress = MilvusService.formatAddress(address);
|
|
|
|
+ if (!cache.has(milvusAddress)) {
|
|
return { connected: false };
|
|
return { connected: false };
|
|
}
|
|
}
|
|
- const res = await this.connectMilvus(address);
|
|
|
|
|
|
+ const res = await this.connectMilvus(address, cache);
|
|
return {
|
|
return {
|
|
connected: res.address ? true : false,
|
|
connected: res.address ? true : false,
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
async flush(data: FlushReq) {
|
|
async flush(data: FlushReq) {
|
|
- const res = await this.activeMilvusClient.dataManager.flush(data);
|
|
|
|
|
|
+ const res = await MilvusService.activeMilvusClient.dataManager.flush(data);
|
|
return res;
|
|
return res;
|
|
}
|
|
}
|
|
|
|
|
|
async getMetrics(): Promise<GetMetricsResponse> {
|
|
async getMetrics(): Promise<GetMetricsResponse> {
|
|
- const res = await this.activeMilvusClient.dataManager.getMetric({
|
|
|
|
|
|
+ const res = await MilvusService.activeMilvusClient.dataManager.getMetric({
|
|
request: { metric_type: 'system_info' },
|
|
request: { metric_type: 'system_info' },
|
|
});
|
|
});
|
|
return res;
|
|
return res;
|