Browse Source

ensure server api called with dedicated milvus clientId (#369)

* support shared session

Signed-off-by: ryjiang <jiangruiyi@gmail.com>

* remove unused code

Signed-off-by: ryjiang <jiangruiyi@gmail.com>

* update code

Signed-off-by: ryjiang <jiangruiyi@gmail.com>

---------

Signed-off-by: ryjiang <jiangruiyi@gmail.com>
ryjiang 1 year ago
parent
commit
1652bba97c

+ 13 - 41
server/src/app.ts

@@ -2,7 +2,6 @@ import express from 'express';
 import cors from 'cors';
 import helmet from 'helmet';
 import * as http from 'http';
-import { Server, Socket } from 'socket.io';
 import { LRUCache } from 'lru-cache';
 import * as path from 'path';
 import chalk from 'chalk';
@@ -14,30 +13,33 @@ import { router as schemaRouter } from './schema';
 import { router as cronsRouter } from './crons';
 import { router as userRouter } from './users';
 import { router as prometheusRouter } from './prometheus';
-import { pubSub } from './events';
 import {
   TransformResMiddleware,
   LoggingMiddleware,
   ErrorMiddleware,
   ReqHeaderMiddleware,
 } from './middleware';
-import { CLIENT_TTL, INDEX_TTL } from './utils';
+import { CLIENT_TTL } from './utils';
 import { getIp } from './utils/Network';
 import { DescribeIndexResponse, MilvusClient } from './types';
+import { initWebSocket } from './socket';
+
 // initialize express app
 export const app = express();
 
 // initialize cache store
-export const clientCache = new LRUCache<string, MilvusClient>({
+export const clientCache = new LRUCache<
+  string,
+  {
+    milvusClient: MilvusClient;
+    address: string;
+    indexCache: LRUCache<string, DescribeIndexResponse>;
+  }
+>({
   ttl: CLIENT_TTL,
   ttlAutopurge: true,
 });
 
-export const indexCache = new LRUCache<string, DescribeIndexResponse>({
-  ttl: INDEX_TTL,
-  ttlAutopurge: true,
-});
-
 // initialize express router
 const router = express.Router();
 // define routers
@@ -88,41 +90,11 @@ app.get('*', (request, response) => {
 });
 // ErrorInterceptor
 app.use(ErrorMiddleware);
+// init websocket server
+initWebSocket(server);
 
 // start server
 server.listen(PORT, () => {
-  // initialize the WebSocket server instance
-  const io = new Server(server, {
-    cors: {
-      origin: '*',
-      methods: ['GET', 'POST'],
-    },
-  });
-
-  // Init WebSocket server event listener
-  io.on('connection', (socket: Socket) => {
-    console.info(
-      chalk.green(`ws client connected ${socket.client.conn.remoteAddress}`)
-    );
-    socket.on('COLLECTION', (message: any) => {
-      socket.emit('COLLECTION', { data: message });
-    });
-    pubSub.on('ws_pubsub', (msg: any) => {
-      socket.emit(msg.event, msg.data);
-    });
-    socket.on('disconnect', () => {
-      console.info(
-        chalk.green(
-          `ws client disconnected ${socket.client.conn.remoteAddress}`
-        )
-      );
-    });
-  });
-
-  server.on('disconnect', (socket: Socket) => {
-    io.removeAllListeners();
-  });
-
   const ips = getIp();
   ips.forEach(ip => {
     console.info(chalk.cyanBright(`Attu server started: http://${ip}:${PORT}`));

+ 99 - 55
server/src/collections/collections.controller.ts

@@ -1,6 +1,7 @@
 import { NextFunction, Request, Response, Router } from 'express';
 import { dtoValidationMiddleware } from '../middleware/validation';
 import { CollectionsService } from './collections.service';
+import { LoadCollectionReq } from '@zilliz/milvus2-sdk-node';
 import {
   CreateAliasDto,
   CreateCollectionDto,
@@ -11,7 +12,6 @@ import {
   RenameCollectionDto,
   DuplicateCollectionDto,
 } from './dto';
-import { LoadCollectionReq } from '@zilliz/milvus2-sdk-node';
 
 export class CollectionController {
   private collectionsService: CollectionsService;
@@ -97,7 +97,7 @@ export class CollectionController {
     // segments
     this.router.get('/:name/psegments', this.getPSegment.bind(this));
     this.router.get('/:name/qsegments', this.getQSegment.bind(this));
-
+    // compact
     this.router.put('/:name/compact', this.compact.bind(this));
     return this.router;
   }
@@ -107,8 +107,8 @@ export class CollectionController {
     try {
       const result =
         type === 1
-          ? await this.collectionsService.getLoadedCollections()
-          : await this.collectionsService.getAllCollections();
+          ? await this.collectionsService.getLoadedCollections(req.clientId)
+          : await this.collectionsService.getAllCollections(req.clientId);
       res.send(result);
     } catch (error) {
       next(error);
@@ -117,7 +117,7 @@ export class CollectionController {
 
   async getStatistics(req: Request, res: Response, next: NextFunction) {
     try {
-      const result = await this.collectionsService.getStatistics();
+      const result = await this.collectionsService.getStatistics(req.clientId);
       res.send(result);
     } catch (error) {
       next(error);
@@ -128,6 +128,7 @@ export class CollectionController {
     const createCollectionData = req.body;
     try {
       const result = await this.collectionsService.createCollection(
+        req.clientId,
         createCollectionData
       );
       res.send(result);
@@ -140,10 +141,13 @@ export class CollectionController {
     const name = req.params?.name;
     const data = req.body;
     try {
-      const result = await this.collectionsService.renameCollection({
-        collection_name: name,
-        ...data,
-      });
+      const result = await this.collectionsService.renameCollection(
+        req.clientId,
+        {
+          collection_name: name,
+          ...data,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -154,10 +158,13 @@ export class CollectionController {
     const name = req.params?.name;
     const data = req.body;
     try {
-      const result = await this.collectionsService.duplicateCollection({
-        collection_name: name,
-        ...data,
-      });
+      const result = await this.collectionsService.duplicateCollection(
+        req.clientId,
+        {
+          collection_name: name,
+          ...data,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -167,9 +174,12 @@ export class CollectionController {
   async dropCollection(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.dropCollection({
-        collection_name: name,
-      });
+      const result = await this.collectionsService.dropCollection(
+        req.clientId,
+        {
+          collection_name: name,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -179,9 +189,12 @@ export class CollectionController {
   async describeCollection(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.getAllCollections({
-        data: [{ name }],
-      });
+      const result = await this.collectionsService.getAllCollections(
+        req.clientId,
+        {
+          data: [{ name }],
+        }
+      );
       res.send(result[0]);
     } catch (error) {
       next(error);
@@ -191,9 +204,12 @@ export class CollectionController {
   async getCollectionInfo(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.describeCollection({
-        collection_name: name,
-      });
+      const result = await this.collectionsService.describeCollection(
+        req.clientId,
+        {
+          collection_name: name,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -207,9 +223,12 @@ export class CollectionController {
   ) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.getCollectionStatistics({
-        collection_name: name,
-      });
+      const result = await this.collectionsService.getCollectionStatistics(
+        req.clientId,
+        {
+          collection_name: name,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -222,7 +241,9 @@ export class CollectionController {
     next: NextFunction
   ) {
     try {
-      const result = await this.collectionsService.getCollectionsIndexStatus();
+      const result = await this.collectionsService.getCollectionsIndexStatus(
+        req.clientId
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -237,7 +258,10 @@ export class CollectionController {
       param.replica_number = Number(data.replica_number);
     }
     try {
-      const result = await this.collectionsService.loadCollection(param);
+      const result = await this.collectionsService.loadCollection(
+        req.clientId,
+        param
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -247,9 +271,12 @@ export class CollectionController {
   async releaseCollection(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.releaseCollection({
-        collection_name: name,
-      });
+      const result = await this.collectionsService.releaseCollection(
+        req.clientId,
+        {
+          collection_name: name,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -260,7 +287,7 @@ export class CollectionController {
     const name = req.params?.name;
     const data = req.body;
     try {
-      const result = await this.collectionsService.insert({
+      const result = await this.collectionsService.insert(req.clientId, {
         collection_name: name,
         ...data,
       });
@@ -273,7 +300,7 @@ export class CollectionController {
   async importSample(req: Request, res: Response, next: NextFunction) {
     const data = req.body;
     try {
-      const result = await this.collectionsService.importSample({
+      const result = await this.collectionsService.importSample(req.clientId, {
         ...data,
       });
       res.send(result);
@@ -285,10 +312,13 @@ export class CollectionController {
     const name = req.params?.name;
     const data = req.body;
     try {
-      const result = await this.collectionsService.deleteEntities({
-        collection_name: name,
-        ...data,
-      });
+      const result = await this.collectionsService.deleteEntities(
+        req.clientId,
+        {
+          collection_name: name,
+          ...data,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -299,7 +329,7 @@ export class CollectionController {
     const name = req.params?.name;
     const data = req.body;
     try {
-      const result = await this.collectionsService.vectorSearch({
+      const result = await this.collectionsService.vectorSearch(req.clientId, {
         collection_name: name,
         ...data,
       });
@@ -320,7 +350,7 @@ export class CollectionController {
       const page = isNaN(resultPage) ? 0 : parseInt(resultPage, 10);
       // TODO: add page and limit to node SDK
       // Here may raise "Error: 8 RESOURCE_EXHAUSTED: Received message larger than max"
-      const result = await this.collectionsService.query({
+      const result = await this.collectionsService.query(req.clientId, {
         collection_name: name,
         ...data,
       });
@@ -341,7 +371,7 @@ export class CollectionController {
     const name = req.params?.name;
     const data = req.body;
     try {
-      const result = await this.collectionsService.createAlias({
+      const result = await this.collectionsService.createAlias(req.clientId, {
         collection_name: name,
         ...data,
       });
@@ -354,7 +384,9 @@ export class CollectionController {
   async dropAlias(req: Request, res: Response, next: NextFunction) {
     const alias = req.params?.alias;
     try {
-      const result = await this.collectionsService.dropAlias({ alias });
+      const result = await this.collectionsService.dropAlias(req.clientId, {
+        alias,
+      });
       res.send(result);
     } catch (error) {
       next(error);
@@ -364,7 +396,7 @@ export class CollectionController {
   async getReplicas(req: Request, res: Response, next: NextFunction) {
     const collectionID = req.params?.collectionID;
     try {
-      const result = await this.collectionsService.getReplicas({
+      const result = await this.collectionsService.getReplicas(req.clientId, {
         collectionID,
       });
       res.send(result);
@@ -376,9 +408,12 @@ export class CollectionController {
   async getPSegment(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.getPersistentSegmentInfo({
-        collectionName: name,
-      });
+      const result = await this.collectionsService.getPersistentSegmentInfo(
+        req.clientId,
+        {
+          collectionName: name,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -388,9 +423,12 @@ export class CollectionController {
   async getQSegment(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.getQuerySegmentInfo({
-        collectionName: name,
-      });
+      const result = await this.collectionsService.getQuerySegmentInfo(
+        req.clientId,
+        {
+          collectionName: name,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -400,7 +438,7 @@ export class CollectionController {
   async compact(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.compact({
+      const result = await this.collectionsService.compact(req.clientId, {
         collection_name: name,
       });
       res.send(result);
@@ -412,12 +450,15 @@ export class CollectionController {
   async count(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const { value } = await this.collectionsService.hasCollection({
-        collection_name: name,
-      });
+      const { value } = await this.collectionsService.hasCollection(
+        req.clientId,
+        {
+          collection_name: name,
+        }
+      );
       let result: any = '';
       if (value) {
-        result = await this.collectionsService.count({
+        result = await this.collectionsService.count(req.clientId, {
           collection_name: name,
         });
       }
@@ -431,9 +472,12 @@ export class CollectionController {
   async empty(req: Request, res: Response, next: NextFunction) {
     const name = req.params?.name;
     try {
-      const result = await this.collectionsService.emptyCollection({
-        collection_name: name,
-      });
+      const result = await this.collectionsService.emptyCollection(
+        req.clientId,
+        {
+          collection_name: name,
+        }
+      );
 
       res.send(result);
     } catch (error) {

+ 124 - 88
server/src/collections/collections.service.ts

@@ -1,4 +1,3 @@
-import { MilvusService } from '../milvus/milvus.service';
 import {
   CreateCollectionReq,
   DescribeCollectionReq,
@@ -34,6 +33,7 @@ import {
 import { QueryDto, ImportSampleDto, GetReplicasDto } from './dto';
 import { CollectionData } from '../types';
 import { SchemaService } from '../schema/schema.service';
+import { clientCache } from '../app';
 
 export class CollectionsService {
   private schemaService: SchemaService;
@@ -42,83 +42,99 @@ export class CollectionsService {
     this.schemaService = new SchemaService();
   }
 
-  async getCollections(data?: ShowCollectionsReq) {
-    const res = await MilvusService.activeMilvusClient.showCollections(data);
+  async getCollections(clientId: string, data?: ShowCollectionsReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.showCollections(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async createCollection(data: CreateCollectionReq) {
-    const res = await MilvusService.activeMilvusClient.createCollection(data);
+  async createCollection(clientId: string, data: CreateCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.createCollection(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async describeCollection(data: DescribeCollectionReq) {
-    const res = await MilvusService.activeMilvusClient.describeCollection(data);
+  async describeCollection(clientId: string, data: DescribeCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.describeCollection(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async renameCollection(data: RenameCollectionReq) {
-    const res = await MilvusService.activeMilvusClient.renameCollection(data);
+  async renameCollection(clientId: string, data: RenameCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.renameCollection(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async dropCollection(data: DropCollectionReq) {
-    const res = await MilvusService.activeMilvusClient.dropCollection(data);
+  async dropCollection(clientId: string, data: DropCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.dropCollection(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async loadCollection(data: LoadCollectionReq) {
-    const res = await MilvusService.activeMilvusClient.loadCollection(data);
+  async loadCollection(clientId: string, data: LoadCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.loadCollection(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async releaseCollection(data: ReleaseLoadCollectionReq) {
-    const res = await MilvusService.activeMilvusClient.releaseCollection(data);
+  async releaseCollection(clientId: string, data: ReleaseLoadCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.releaseCollection(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async getCollectionStatistics(data: GetCollectionStatisticsReq) {
-    const res = await MilvusService.activeMilvusClient.getCollectionStatistics(
-      data
-    );
+  async getCollectionStatistics(
+    clientId: string,
+    data: GetCollectionStatisticsReq
+  ) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.getCollectionStatistics(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async count(data: CountReq) {
+  async count(clientId: string, data: CountReq) {
+        const { milvusClient } = clientCache.get(clientId);
     let count = 0;
     try {
-      const countRes = await MilvusService.activeMilvusClient.count(data);
+      const countRes = await milvusClient.count(data);
       count = countRes.data;
     } catch (error) {
-      const collectionStatisticsRes = await this.getCollectionStatistics(data);
+      const collectionStatisticsRes = await this.getCollectionStatistics(
+        clientId,
+        data
+      );
       count = collectionStatisticsRes.data.row_count;
     }
     return count;
   }
 
-  async insert(data: InsertReq) {
-    const res = await MilvusService.activeMilvusClient.insert(data);
+  async insert(clientId: string, data: InsertReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.insert(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async deleteEntities(data: DeleteEntitiesReq) {
-    const res = await MilvusService.activeMilvusClient.deleteEntities(data);
+  async deleteEntities(clientId: string, data: DeleteEntitiesReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.deleteEntities(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async vectorSearch(data: SearchReq) {
+  async vectorSearch(clientId: string, data: SearchReq) {
+        const { milvusClient } = clientCache.get(clientId);
     const now = Date.now();
-    const res = await MilvusService.activeMilvusClient.search(data);
+    const res = await milvusClient.search(data);
     const after = Date.now();
 
     throwErrorFromSDK(res.status);
@@ -126,36 +142,42 @@ export class CollectionsService {
     return res;
   }
 
-  async createAlias(data: CreateAliasReq) {
-    const res = await MilvusService.activeMilvusClient.createAlias(data);
+  async createAlias(clientId: string, data: CreateAliasReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.createAlias(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async alterAlias(data: AlterAliasReq) {
-    const res = await MilvusService.activeMilvusClient.alterAlias(data);
+  async alterAlias(clientId: string, data: AlterAliasReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.alterAlias(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async dropAlias(data: DropAliasReq) {
-    const res = await MilvusService.activeMilvusClient.dropAlias(data);
+  async dropAlias(clientId: string, data: DropAliasReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.dropAlias(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async getReplicas(data: GetReplicasDto) {
-    const res = await MilvusService.activeMilvusClient.getReplicas(data);
+  async getReplicas(clientId: string, data: GetReplicasDto) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.getReplicas(data);
     return res;
   }
 
   async query(
+    clientId: string,
     data: {
       collection_name: string;
     } & QueryDto
   ) {
+        const { milvusClient } = clientCache.get(clientId);
     const now = Date.now();
-    const res = await MilvusService.activeMilvusClient.query(data);
+    const res = await milvusClient.query(data);
 
     const after = Date.now();
 
@@ -168,12 +190,15 @@ export class CollectionsService {
    * Get all collections meta data
    * @returns {id:string, collection_name:string, schema:Field[], autoID:boolean, rowCount: string, consistency_level:string}
    */
-  async getAllCollections(collections?: {
-    data: { name: string }[];
-  }): Promise<CollectionData[]> {
+  async getAllCollections(
+    clientId: string,
+    collections?: {
+      data: { name: string }[];
+    }
+  ): Promise<CollectionData[]> {
     const data: CollectionData[] = [];
-    const res = collections || (await this.getCollections());
-    const loadedCollections = await this.getCollections({
+    const res = collections || (await this.getCollections(clientId));
+    const loadedCollections = await this.getCollections(clientId, {
       type: ShowCollectionsType.Loaded,
     });
     if (res.data.length > 0) {
@@ -181,17 +206,20 @@ export class CollectionsService {
         const { name } = item;
 
         // get collection schema and properties
-        const collectionInfo = await this.describeCollection({
+        const collectionInfo = await this.describeCollection(clientId, {
           collection_name: name,
         });
 
         // get collection statistic data
-        const collectionStatisticsRes = await this.getCollectionStatistics({
-          collection_name: name,
-        });
+        const collectionStatisticsRes = await this.getCollectionStatistics(
+          clientId,
+          {
+            collection_name: name,
+          }
+        );
 
         // get index info for collections
-        const indexRes = await this.schemaService.describeIndex({
+        const indexRes = await this.schemaService.describeIndex(clientId, {
           collection_name: item.name,
         });
 
@@ -214,7 +242,7 @@ export class CollectionsService {
         let replicas;
         try {
           replicas = loadCollection
-            ? await this.getReplicas({
+            ? await this.getReplicas(clientId, {
                 collectionID: collectionInfo.collectionID,
               })
             : replicas;
@@ -243,16 +271,16 @@ export class CollectionsService {
     return data;
   }
 
-  async getLoadedCollections() {
+  async getLoadedCollections(clientId: string) {
     const data = [];
-    const res = await this.getCollections({
+    const res = await this.getCollections(clientId, {
       type: ShowCollectionsType.Loaded,
     });
     if (res.data.length > 0) {
       for (const item of res.data) {
         const { id, name } = item;
 
-        const count = this.count({ collection_name: name });
+        const count = this.count(clientId, { collection_name: name });
         data.push({
           id,
           collection_name: name,
@@ -268,18 +296,21 @@ export class CollectionsService {
    * Get collections statistics data
    * @returns {collectionCount:number, totalData:number}
    */
-  async getStatistics() {
+  async getStatistics(clientId: string) {
     const data = {
       collectionCount: 0,
       totalData: 0,
     };
-    const res = await this.getCollections();
+    const res = await this.getCollections(clientId);
     data.collectionCount = res.data.length;
     if (res.data.length > 0) {
       for (const item of res.data) {
-        const collectionStatistics = await this.getCollectionStatistics({
-          collection_name: item.name,
-        });
+        const collectionStatistics = await this.getCollectionStatistics(
+          clientId,
+          {
+            collection_name: item.name,
+          }
+        );
         const rowCount = findKeyValue(collectionStatistics.stats, ROW_COUNT);
         data.totalData += isNaN(Number(rowCount)) ? 0 : Number(rowCount);
       }
@@ -291,12 +322,12 @@ export class CollectionsService {
    * Get all collection index status
    * @returns {collection_name:string, index_descriptions: index_descriptions}[]
    */
-  async getCollectionsIndexStatus() {
+  async getCollectionsIndexStatus(clientId: string) {
     const data = [];
-    const res = await this.getCollections();
+    const res = await this.getCollections(clientId);
     if (res.data.length > 0) {
       for (const item of res.data) {
-        const indexRes = await this.schemaService.describeIndex({
+        const indexRes = await this.schemaService.describeIndex(clientId, {
           collection_name: item.name,
         });
         data.push({
@@ -311,13 +342,13 @@ export class CollectionsService {
   /**
    * Load sample data into collection
    */
-  async importSample({
-    collection_name,
-    size,
-    download,
-    format,
-  }: ImportSampleDto) {
-    const collectionInfo = await this.describeCollection({ collection_name });
+  async importSample(
+    clientId: string,
+    { collection_name, size, download, format }: ImportSampleDto
+  ) {
+    const collectionInfo = await this.describeCollection(clientId, {
+      collection_name,
+    });
     const fields_data = genRows(
       collectionInfo.schema.fields,
       parseInt(size, 10),
@@ -334,46 +365,50 @@ export class CollectionsService {
       return { sampleFile };
     } else {
       // Otherwise, insert the data into the collection
-      return await this.insert({ collection_name, fields_data });
+      return await this.insert(clientId, { collection_name, fields_data });
     }
   }
 
-  async getCompactionState(data: GetCompactionStateReq) {
-    const res = await MilvusService.activeMilvusClient.getCompactionState(data);
+  async getCompactionState(clientId: string, data: GetCompactionStateReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.getCompactionState(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async getQuerySegmentInfo(data: GetQuerySegmentInfoReq) {
-    const res = await MilvusService.activeMilvusClient.getQuerySegmentInfo(
-      data
-    );
+  async getQuerySegmentInfo(clientId: string, data: GetQuerySegmentInfoReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.getQuerySegmentInfo(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async getPersistentSegmentInfo(data: GePersistentSegmentInfoReq) {
-    const res = await MilvusService.activeMilvusClient.getPersistentSegmentInfo(
-      data
-    );
+  async getPersistentSegmentInfo(
+    clientId: string,
+    data: GePersistentSegmentInfoReq
+  ) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.getPersistentSegmentInfo(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async compact(data: CompactReq) {
-    const res = await MilvusService.activeMilvusClient.compact(data);
+  async compact(clientId: string, data: CompactReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.compact(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async hasCollection(data: HasCollectionReq) {
-    const res = await MilvusService.activeMilvusClient.hasCollection(data);
+  async hasCollection(clientId: string, data: HasCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.hasCollection(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async duplicateCollection(data: RenameCollectionReq) {
-    const collection: any = await this.describeCollection({
+  async duplicateCollection(clientId: string, data: RenameCollectionReq) {
+    const collection: any = await this.describeCollection(clientId, {
       collection_name: data.collection_name,
     });
 
@@ -391,14 +426,15 @@ export class CollectionsService {
       createCollectionParams.num_partitions = Number(collection.num_partitions);
     }
 
-    return await this.createCollection(createCollectionParams);
+    return await this.createCollection(clientId, createCollectionParams);
   }
 
-  async emptyCollection(data: HasCollectionReq) {
-    const pkField = await MilvusService.activeMilvusClient.getPkFieldName(data);
-    const pkType = await MilvusService.activeMilvusClient.getPkFieldType(data);
+  async emptyCollection(clientId: string, data: HasCollectionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const pkField = await milvusClient.getPkFieldName(data);
+    const pkType = await milvusClient.getPkFieldType(data);
 
-    const res = await MilvusService.activeMilvusClient.deleteEntities({
+    const res = await milvusClient.deleteEntities({
       collection_name: data.collection_name,
       filter: pkType === 'Int64' ? `${pkField} >= 0` : `${pkField} != ''`,
     });

+ 1 - 4
server/src/crons/crons.controller.ts

@@ -3,7 +3,6 @@ import { dtoValidationMiddleware } from '../middleware/validation';
 import { CronsService, SchedulerRegistry } from './crons.service';
 import { collectionsService } from '../collections';
 import { ToggleCronJobByNameDto } from './dto';
-import { MILVUS_CLIENT_ID } from '../utils';
 
 export class CronsController {
   private router: Router;
@@ -31,12 +30,10 @@ export class CronsController {
 
   async toggleCronJobByName(req: Request, res: Response, next: NextFunction) {
     const cronData = req.body;
-    const milvusAddress = (req.headers[MILVUS_CLIENT_ID] as string) || '';
     // console.log(cronData, milvusAddress);
     try {
-      const result = await this.cronsService.toggleCronJobByName({
+      const result = await this.cronsService.toggleCronJobByName(req.clientId, {
         ...cronData,
-        address: milvusAddress,
       });
       res.send(result);
     } catch (error) {

+ 29 - 26
server/src/crons/crons.service.ts

@@ -1,7 +1,7 @@
 import { schedule, ScheduledTask } from 'node-cron';
 import { CollectionsService } from '../collections/collections.service';
 import { WS_EVENTS, WS_EVENTS_TYPE } from '../utils';
-import { pubSub } from '../events';
+import { serverEvent } from '../events';
 
 export class CronsService {
   constructor(
@@ -9,18 +9,20 @@ export class CronsService {
     private schedulerRegistry: SchedulerRegistry
   ) {}
 
-  async toggleCronJobByName(data: {
-    name: string;
-    type: WS_EVENTS_TYPE;
-    milvusClientId: string;
-  }) {
-    const { name, type, milvusClientId } = data;
+  async toggleCronJobByName(
+    clientId: string,
+    data: {
+      name: string;
+      type: WS_EVENTS_TYPE;
+    }
+  ) {
+    const { name, type } = data;
 
     switch (name) {
       case WS_EVENTS.COLLECTION:
-        const cronJobEntity = this.schedulerRegistry.getCronJob(name, milvusClientId);
+        const cronJobEntity = this.schedulerRegistry.getCronJob(clientId, name);
         if (!cronJobEntity && Number(type) === WS_EVENTS_TYPE.START) {
-          return this.getCollections(WS_EVENTS.COLLECTION, milvusClientId);
+          return this.getCollections(clientId, WS_EVENTS.COLLECTION);
         }
         if (!cronJobEntity) {
           return;
@@ -33,20 +35,20 @@ export class CronsService {
     }
   }
 
-  async getCollections(name: string, milvusClientId: string) {
+  async getCollections(clientId: string, name: string) {
     const task = async () => {
       try {
-        const res = await this.collectionService.getAllCollections();
+        const res = await this.collectionService.getAllCollections(clientId);
         // TODO
         // this.eventService.server.emit("COLLECTION", res);
-        pubSub.emit('ws_pubsub', {
+        serverEvent.emit(WS_EVENTS.TO_CLIENT, {
           event: WS_EVENTS.COLLECTION + '',
           data: res,
         });
         return res;
       } catch (error) {
         // When user not connect milvus, stop cron
-        const cronJobEntity = this.schedulerRegistry.getCronJob(name, milvusClientId);
+        const cronJobEntity = this.schedulerRegistry.getCronJob(clientId, name);
         if (cronJobEntity) {
           cronJobEntity.stop();
         }
@@ -54,23 +56,23 @@ export class CronsService {
         throw new Error(error);
       }
     };
-    this.schedulerRegistry.setCronJobEveryFiveSecond(name, task, milvusClientId);
+    this.schedulerRegistry.setCronJobEveryFiveSecond(clientId, name, task);
   }
 }
 
 export class SchedulerRegistry {
   constructor(private cronJobList: CronJob[]) {}
 
-  getCronJob(name: string, milvusClientId: string) {
+  getCronJob(clientId: string, name: string) {
     const target = this.cronJobList.find(
-      item => item.name === name && item.milvusClientId === milvusClientId
+      item => item.name === name && item.clientId === clientId
     );
     return target?.entity;
   }
 
-  setCronJobEveryFiveSecond(name: string, func: () => {}, milvusClientId: string) {
+  setCronJobEveryFiveSecond(clientId: string, name: string, func: () => {}) {
     // The cron job will run every second
-    this.setCronJob(name, '*/5 * * * * *', func, milvusClientId);
+    this.setCronJob(clientId, name, '*/5 * * * * *', func);
   }
 
   // ┌────────────── second (optional)
@@ -83,9 +85,14 @@ export class SchedulerRegistry {
   // │ │ │ │ │ │
   // * * * * * *
   // https://www.npmjs.com/package/node-cron
-  setCronJob(name: string, scheduler: string, func: () => {}, milvusClientId: string) {
+  setCronJob(
+    clientId: string,
+    name: string,
+    scheduler: string,
+    func: () => {}
+  ) {
     const target = this.cronJobList.find(
-      item => item.name === name && item.milvusClientId === milvusClientId
+      item => item.name === name && item.clientId === clientId
     );
     if (target) {
       target?.entity?.stop();
@@ -94,17 +101,13 @@ export class SchedulerRegistry {
         console.log(`[Scheduler:${scheduler}] ${name}: running a task.`);
         func();
       });
-      this.cronJobList.push({
-        name,
-        entity: task,
-        milvusClientId,
-      });
+      this.cronJobList.push({ clientId, name, entity: task });
     }
   }
 }
 
 interface CronJob {
+  clientId: string; // milvus milvusClientId
   name: string;
   entity: ScheduledTask;
-  milvusClientId: string; // milvus milvusClientId
 }

+ 6 - 3
server/src/database/databases.controller.ts

@@ -34,6 +34,7 @@ export class DatabasesController {
     const createDatabaseData = req.body;
     try {
       const result = await this.databasesService.createDatabase(
+        req.clientId,
         createDatabaseData
       );
       res.send(result);
@@ -44,7 +45,7 @@ export class DatabasesController {
 
   async listDatabases(req: Request, res: Response, next: NextFunction) {
     try {
-      const result = await this.databasesService.listDatabase();
+      const result = await this.databasesService.listDatabase(req.clientId);
       result.db_names = result.db_names.sort((a: string, b: string) => {
         if (a === 'default') {
           return -1; // 'default' comes before other strings
@@ -53,7 +54,7 @@ export class DatabasesController {
         } else {
           return a.localeCompare(b); // sort other strings alphabetically
         }
-      })
+      });
       res.send(result);
     } catch (error) {
       next(error);
@@ -63,7 +64,9 @@ export class DatabasesController {
   async dropDatabase(req: Request, res: Response, next: NextFunction) {
     const db_name = req.params?.name;
     try {
-      const result = await this.databasesService.dropDatabase({ db_name });
+      const result = await this.databasesService.dropDatabase(req.clientId, {
+        db_name,
+      });
       res.send(result);
     } catch (error) {
       next(error);

+ 19 - 10
server/src/database/databases.service.ts

@@ -5,32 +5,41 @@ import {
   DropDatabasesRequest,
 } from '@zilliz/milvus2-sdk-node';
 import { throwErrorFromSDK } from '../utils/Error';
+import { clientCache } from '../app';
 
 export class DatabasesService {
-  async createDatabase(data: CreateDatabaseRequest) {
-    const res = await MilvusService.activeMilvusClient.createDatabase(data);
+  async createDatabase(clientId: string, data: CreateDatabaseRequest) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.createDatabase(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async listDatabase(data?: ListDatabasesRequest) {
-    const res = await MilvusService.activeMilvusClient.listDatabases(data);
+  async listDatabase(clientId: string, data?: ListDatabasesRequest) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.listDatabases(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async dropDatabase(data: DropDatabasesRequest) {
-    const res = await MilvusService.activeMilvusClient.dropDatabase(data);
+  async dropDatabase(clientId: string, data: DropDatabasesRequest) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.dropDatabase(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async use(db_name: string) {
-    return await await MilvusService.activeMilvusClient.use({ db_name });
+  async use(clientId: string, db_name: string) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    return await await milvusClient.use({ db_name });
   }
 
-  async hasDatabase(data: string) {
-    const { db_names } = await this.listDatabase();
+  async hasDatabase(clientId: string, data: string) {
+    const { db_names } = await this.listDatabase(clientId);
     return db_names.indexOf(data) !== -1;
   }
 }

+ 3 - 3
server/src/events/index.ts

@@ -22,7 +22,7 @@ export class PubSub {
       console.warn(`eventType: ${eventType} missing`);
       return;
     }
-    this.handlers[eventType].forEach((handler) => {
+    this.handlers[eventType].forEach(handler => {
       handler(...handlerArgs);
     });
     return this;
@@ -36,7 +36,7 @@ export class PubSub {
     }
     // delete handler
     this.handlers[eventType] = this.handlers[eventType].filter(
-      (item) => item !== handler
+      item => item !== handler
     );
     return this;
   }
@@ -44,4 +44,4 @@ export class PubSub {
 
 type HandlerFunction = (...args: any) => void;
 
-export const pubSub = new PubSub();
+export const serverEvent = new PubSub();

+ 10 - 12
server/src/middleware/index.ts

@@ -1,12 +1,19 @@
 import { Request, Response, NextFunction } from 'express';
 import morgan from 'morgan';
 import chalk from 'chalk';
-import { MilvusService } from '../milvus/milvus.service';
 import { MILVUS_CLIENT_ID, HTTP_STATUS_CODE } from '../utils';
 import { HttpError } from 'http-errors';
 import HttpErrors from 'http-errors';
 import { clientCache } from '../app';
 
+declare global {
+  namespace Express {
+    interface Request {
+      clientId?: string;
+    }
+  }
+}
+
 export const ReqHeaderMiddleware = (
   req: Request,
   res: Response,
@@ -15,23 +22,14 @@ export const ReqHeaderMiddleware = (
   // all ape requests need set milvus address in header.
   // server will set active address in milvus service.
   const milvusClientId = (req.headers[MILVUS_CLIENT_ID] as string) || '';
-
-  // console.log('------ Request headers -------', req.headers);
-  //  only api request has MILVUS_CLIENT_ID.
-  //  When client run in express, we dont need static files like: xx.js run this logic.
-  //  Otherwise will cause 401 error.
-  if (milvusClientId && clientCache.has(milvusClientId)) {
-    MilvusService.activeAddress = milvusClientId;
-    // insight cache will update expire time when use insightCache.get
-    MilvusService.activeMilvusClient = clientCache.get(milvusClientId);
-  }
+  req.clientId = req.headers[MILVUS_CLIENT_ID] as string;
 
   const CONNECT_URL = `/api/v1/milvus/connect`;
 
   if (
     req.url !== CONNECT_URL &&
     milvusClientId &&
-    !MilvusService.activeMilvusClient
+    !clientCache.get(milvusClientId)
   ) {
     throw HttpErrors(
       HTTP_STATUS_CODE.FORBIDDEN,

+ 14 - 5
server/src/milvus/milvus.controller.ts

@@ -62,7 +62,10 @@ export class MilvusController {
     const address = '' + req.query?.address;
 
     try {
-      const result = await this.milvusService.checkConnect(address);
+      const result = await this.milvusService.checkConnect(
+        req.clientId,
+        address
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -72,7 +75,10 @@ export class MilvusController {
   async flush(req: Request, res: Response, next: NextFunction) {
     const collectionNames = req.body;
     try {
-      const result = await this.milvusService.flush(collectionNames);
+      const result = await this.milvusService.flush(
+        req.clientId,
+        collectionNames
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -81,7 +87,7 @@ export class MilvusController {
 
   async getMetrics(req: Request, res: Response, next: NextFunction) {
     try {
-      const result = await this.milvusService.getMetrics();
+      const result = await this.milvusService.getMetrics(req.clientId);
       res.send(result);
     } catch (error) {
       next(error);
@@ -100,7 +106,10 @@ export class MilvusController {
     const { database } = req.body;
 
     try {
-      const result = await this.milvusService.useDatabase(database);
+      const result = await this.milvusService.useDatabase(
+        req.clientId,
+        database
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -108,7 +117,7 @@ export class MilvusController {
   }
 
   closeConnection(req: Request, res: Response, next: NextFunction) {
-    const result = this.milvusService.closeConnection();
+    const result = this.milvusService.closeConnection(req.clientId);
     res.send({ result });
   }
 }

+ 34 - 32
server/src/milvus/milvus.service.ts

@@ -3,19 +3,16 @@ import {
   FlushReq,
   GetMetricsResponse,
   ClientConfig,
+  DescribeIndexResponse,
 } from '@zilliz/milvus2-sdk-node';
-import HttpErrors from 'http-errors';
-import { HTTP_STATUS_CODE } from '../utils/Const';
-import { DEFAULT_MILVUS_PORT } from '../utils';
+import { LRUCache } from 'lru-cache';
+import { DEFAULT_MILVUS_PORT, INDEX_TTL } from '../utils';
 import { connectivityState } from '@grpc/grpc-js';
 import { DatabasesService } from '../database/databases.service';
 import { clientCache } from '../app';
 
 export class MilvusService {
   private databaseService: DatabasesService;
-  // Share with all instances, so activeAddress is static
-  static activeAddress: string;
-  static activeMilvusClient: MilvusClient;
 
   constructor() {
     this.databaseService = new DatabasesService();
@@ -31,15 +28,6 @@ export class MilvusService {
     return ip.includes(':') ? ip : `${ip}:${DEFAULT_MILVUS_PORT}`;
   }
 
-  checkMilvus() {
-    if (!MilvusService.activeMilvusClient) {
-      throw HttpErrors(
-        HTTP_STATUS_CODE.FORBIDDEN,
-        'Can not find your connection, please check your connection settings.'
-      );
-    }
-  }
-
   async connectMilvus(data: {
     address: string;
     username?: string;
@@ -80,15 +68,12 @@ export class MilvusService {
       // create the client
       const milvusClient: MilvusClient = new MilvusClient(clientOptions);
 
-      // Set the active Milvus client to the newly created client
-      MilvusService.activeMilvusClient = milvusClient;
-
       try {
         // Attempt to connect to the Milvus server
         await milvusClient.connectPromise;
       } catch (error) {
         // If the connection fails, clear the cache and throw an error
-        clientCache.dump();
+        clientCache.delete(milvusClient.clientId);
         throw new Error('Failed to connect to Milvus: ' + error);
       }
 
@@ -101,20 +86,29 @@ export class MilvusService {
       }
 
       // If the server is healthy, set the active address and add the client to the cache
-      MilvusService.activeAddress = address;
-      clientCache.set(milvusClient.clientId, milvusClient);
+      clientCache.set(milvusClient.clientId, {
+        milvusClient,
+        address,
+        indexCache: new LRUCache<string, DescribeIndexResponse>({
+          ttl: INDEX_TTL,
+          ttlAutopurge: true,
+        }),
+      });
 
       // Create a new database service and check if the specified database exists
       let hasDatabase = false;
       try {
-        hasDatabase = await this.databaseService.hasDatabase(database);
+        hasDatabase = await this.databaseService.hasDatabase(
+          milvusClient.clientId,
+          database
+        );
       } catch (_) {
         // ignore error
       }
 
       // if database exists, use this db
       if (hasDatabase) {
-        await this.databaseService.use(database);
+        await this.databaseService.use(milvusClient.clientId, database);
       }
 
       // Return the address and the database (if it exists, otherwise return 'default')
@@ -130,30 +124,38 @@ export class MilvusService {
     }
   }
 
-  async checkConnect(address: string) {
+  async checkConnect(clientId: string, address: string) {
     const milvusAddress = MilvusService.formatAddress(address);
     return { connected: clientCache.has(milvusAddress) };
   }
 
-  async flush(data: FlushReq) {
-    const res = await MilvusService.activeMilvusClient.flush(data);
+  async flush(clientId: string, data: FlushReq) {
+    const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.flush(data);
     return res;
   }
 
-  async getMetrics(): Promise<GetMetricsResponse> {
-    const res = await MilvusService.activeMilvusClient.getMetric({
+  async getMetrics(clientId: string): Promise<GetMetricsResponse> {
+    const { milvusClient } = clientCache.get(clientId);
+
+    const res = milvusClient.getMetric({
       request: { metric_type: 'system_info' },
     });
     return res;
   }
 
-  closeConnection(): connectivityState {
-    const res = MilvusService.activeMilvusClient.closeConnection();
+  closeConnection(clientId: string): connectivityState {
+    const { milvusClient } = clientCache.get(clientId);
+
+    const res = milvusClient.closeConnection();
     return res;
   }
 
-  async useDatabase(db: string) {
-    const res = await MilvusService.activeMilvusClient.use({
+  async useDatabase(clientId: string, db: string) {
+    const { milvusClient } = clientCache.get(clientId);
+
+    const res = milvusClient.use({
       db_name: db,
     });
     return res;

+ 16 - 7
server/src/partitions/partitions.controller.ts

@@ -44,9 +44,12 @@ export class PartitionController {
   async getPartitionsInfo(req: Request, res: Response, next: NextFunction) {
     const collectionName = '' + req.query?.collection_name;
     try {
-      const result = await this.partitionsService.getPartitionsInfo({
-        collection_name: collectionName,
-      });
+      const result = await this.partitionsService.getPartitionsInfo(
+        req.clientId,
+        {
+          collection_name: collectionName,
+        }
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -58,8 +61,8 @@ export class PartitionController {
     try {
       const result =
         type.toLocaleLowerCase() === 'create'
-          ? await this.partitionsService.createPartition(params)
-          : await this.partitionsService.deletePartition(params);
+          ? await this.partitionsService.createPartition(req.clientId, params)
+          : await this.partitionsService.deletePartition(req.clientId, params);
       res.send(result);
     } catch (error) {
       next(error);
@@ -69,7 +72,10 @@ export class PartitionController {
   async loadPartition(req: Request, res: Response, next: NextFunction) {
     const data = req.body;
     try {
-      const result = await this.partitionsService.loadPartitions(data);
+      const result = await this.partitionsService.loadPartitions(
+        req.clientId,
+        data
+      );
       res.send(result);
     } catch (error) {
       next(error);
@@ -79,7 +85,10 @@ export class PartitionController {
   async releasePartition(req: Request, res: Response, next: NextFunction) {
     const data = req.body;
     try {
-      const result = await this.partitionsService.releasePartitions(data);
+      const result = await this.partitionsService.releasePartitions(
+        req.clientId,
+        data
+      );
       res.send(result);
     } catch (error) {
       next(error);

+ 31 - 18
server/src/partitions/partitions.service.ts

@@ -1,4 +1,3 @@
-import { MilvusService } from '../milvus/milvus.service';
 import {
   CreatePartitionReq,
   DropPartitionReq,
@@ -10,14 +9,15 @@ import {
 import { throwErrorFromSDK } from '../utils/Error';
 import { findKeyValue } from '../utils/Helper';
 import { ROW_COUNT } from '../utils';
+import { clientCache } from '../app';
 
 export class PartitionsService {
-  async getPartitionsInfo(data: ShowPartitionsReq) {
+  async getPartitionsInfo(clientId: string, data: ShowPartitionsReq) {
     const result = [];
-    const res = await this.getPartitions(data);
+    const res = await this.getPartitions(clientId, data);
     if (res.partition_names && res.partition_names.length) {
       for (const [index, name] of res.partition_names.entries()) {
-        const statistics = await this.getPartitionStatistics({
+        const statistics = await this.getPartitionStatistics(clientId, {
           ...data,
           partition_name: name,
         });
@@ -32,40 +32,53 @@ export class PartitionsService {
     return result;
   }
 
-  async getPartitions(data: ShowPartitionsReq) {
-    const res = await MilvusService.activeMilvusClient.showPartitions(data);
+  async getPartitions(clientId: string, data: ShowPartitionsReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.showPartitions(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async createPartition(data: CreatePartitionReq) {
-    const res = await MilvusService.activeMilvusClient.createPartition(data);
+  async createPartition(clientId: string, data: CreatePartitionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.createPartition(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async deletePartition(data: DropPartitionReq) {
-    const res = await MilvusService.activeMilvusClient.dropPartition(data);
+  async deletePartition(clientId: string, data: DropPartitionReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.dropPartition(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async getPartitionStatistics(data: GetPartitionStatisticsReq) {
-    const res = await MilvusService.activeMilvusClient.getPartitionStatistics(
-      data
-    );
+  async getPartitionStatistics(
+    clientId: string,
+    data: GetPartitionStatisticsReq
+  ) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.getPartitionStatistics(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async loadPartitions(data: LoadPartitionsReq) {
-    const res = await MilvusService.activeMilvusClient.loadPartitions(data);
+  async loadPartitions(clientId: string, data: LoadPartitionsReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.loadPartitions(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async releasePartitions(data: ReleasePartitionsReq) {
-    const res = await MilvusService.activeMilvusClient.releasePartitions(data);
+  async releasePartitions(clientId: string, data: ReleasePartitionsReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.releasePartitions(data);
     throwErrorFromSDK(res);
     return res;
   }

+ 4 - 4
server/src/schema/schema.controller.ts

@@ -31,13 +31,13 @@ export class SchemaController {
     try {
       const result =
         type.toLocaleLowerCase() === 'create'
-          ? await this.schemaService.createIndex({
+          ? await this.schemaService.createIndex(req.clientId, {
               collection_name,
               extra_params,
               field_name,
               index_name,
             })
-          : await this.schemaService.dropIndex({
+          : await this.schemaService.dropIndex(req.clientId, {
               collection_name,
               field_name,
               index_name,
@@ -51,7 +51,7 @@ export class SchemaController {
   async describeIndex(req: Request, res: Response, next: NextFunction) {
     const data = '' + req.query?.collection_name;
     try {
-      const result = await this.schemaService.describeIndex({
+      const result = await this.schemaService.describeIndex(req.clientId, {
         collection_name: data,
       });
       res.send(result);
@@ -62,7 +62,7 @@ export class SchemaController {
 
   async clearCache(req: Request, res: Response, next: NextFunction) {
     try {
-      const result = await this.schemaService.clearCache();
+      const result = await this.schemaService.clearCache(req.clientId);
       res.send(result);
     } catch (error) {
       next(error);

+ 14 - 9
server/src/schema/schema.service.ts

@@ -5,12 +5,12 @@ import {
   DescribeIndexResponse,
 } from '@zilliz/milvus2-sdk-node';
 import { throwErrorFromSDK } from '../utils/Error';
-import { MilvusService } from '../milvus/milvus.service';
-import { indexCache } from '../app';
+import { clientCache } from '../app';
 
 export class SchemaService {
-  async createIndex(data: CreateIndexReq) {
-    const res = await MilvusService.activeMilvusClient.createIndex(data);
+  async createIndex(clientId: string, data: CreateIndexReq) {
+    const { milvusClient, indexCache } = clientCache.get(clientId);
+    const res = await milvusClient.createIndex(data);
     const key = data.collection_name;
 
     // clear cache;
@@ -28,7 +28,9 @@ export class SchemaService {
    * @param data - The request data for describing an index. It contains the collection name.
    * @returns - The response from the Milvus SDK's describeIndex function or the cached index description.
    */
-  async describeIndex(data: DescribeIndexReq) {
+  async describeIndex(clientId: string, data: DescribeIndexReq) {
+    const { milvusClient, indexCache } = clientCache.get(clientId);
+
     // Get the collection name from the request data
     const key = data.collection_name;
 
@@ -40,7 +42,7 @@ export class SchemaService {
       return value;
     } else {
       // If the index description is not in the cache, call the Milvus SDK's describeIndex function
-      const res = await MilvusService.activeMilvusClient.describeIndex(data);
+      const res = await milvusClient.describeIndex(data);
 
       // If the index is finished building and there is at least one index description,
       // cache the index description for future use
@@ -59,8 +61,10 @@ export class SchemaService {
     }
   }
 
-  async dropIndex(data: DropIndexReq) {
-    const res = await MilvusService.activeMilvusClient.dropIndex(data);
+  async dropIndex(clientId: string, data: DropIndexReq) {
+    const { milvusClient, indexCache } = clientCache.get(clientId);
+
+    const res = await milvusClient.dropIndex(data);
     const key = data.collection_name;
 
     // clear cache;
@@ -69,7 +73,8 @@ export class SchemaService {
     return res;
   }
 
-  async clearCache() {
+  async clearCache(clientId: string) {
+    const { indexCache } = clientCache.get(clientId);
     return indexCache.clear();
   }
 }

+ 44 - 0
server/src/socket.ts

@@ -0,0 +1,44 @@
+// socket.ts
+import { Server, Socket } from 'socket.io';
+import * as http from 'http';
+import chalk from 'chalk';
+import { serverEvent } from './events';
+import { WS_EVENTS } from './utils';
+export let io: Server;
+
+export function initWebSocket(server: http.Server) {
+  io = new Server(server, {
+    cors: {
+      origin: '*',
+      methods: ['GET', 'POST'],
+    },
+  });
+
+  io.on('connection', (socket: Socket) => {
+    console.info(
+      chalk.green(`ws client connected ${socket.client.conn.remoteAddress}`)
+    );
+
+    socket.on(WS_EVENTS.COLLECTION, (message: any) => {
+      socket.emit(WS_EVENTS.COLLECTION, { data: message });
+    });
+
+    // frontend emit -> serverEvent.emit -> server event handler
+    socket.on(WS_EVENTS.TO_SERVER, (msg: any) => {
+      serverEvent.emit(msg.event, msg);
+    });
+
+    // server emit -> socket emit -> frontend event handler
+    serverEvent.on(WS_EVENTS.TO_CLIENT, (msg: any) => {
+      socket.emit(msg.event, msg.data);
+    });
+
+    socket.on('disconnect', () => {
+      console.info(
+        chalk.green(
+          `ws client disconnected ${socket.client.conn.remoteAddress}`
+        )
+      );
+    });
+  });
+}

+ 32 - 16
server/src/users/users.controller.ts

@@ -63,7 +63,7 @@ export class UserController {
 
   async getUsers(req: Request, res: Response, next: NextFunction) {
     try {
-      const result = await this.userService.getUsers();
+      const result = await this.userService.getUsers(req.clientId);
 
       res.send(result);
     } catch (error) {
@@ -74,7 +74,10 @@ export class UserController {
   async createUsers(req: Request, res: Response, next: NextFunction) {
     const { username, password } = req.body;
     try {
-      const result = await this.userService.createUser({ username, password });
+      const result = await this.userService.createUser(req.clientId, {
+        username,
+        password,
+      });
       res.send(result);
     } catch (error) {
       next(error);
@@ -84,7 +87,7 @@ export class UserController {
   async updateUsers(req: Request, res: Response, next: NextFunction) {
     const { username, oldPassword, newPassword } = req.body;
     try {
-      const result = await this.userService.updateUser({
+      const result = await this.userService.updateUser(req.clientId, {
         username,
         oldPassword,
         newPassword,
@@ -98,7 +101,9 @@ export class UserController {
   async deleteUser(req: Request, res: Response, next: NextFunction) {
     const { username } = req.params;
     try {
-      const result = await this.userService.deleteUser({ username });
+      const result = await this.userService.deleteUser(req.clientId, {
+        username,
+      });
       res.send(result);
     } catch (error) {
       next(error);
@@ -107,10 +112,10 @@ export class UserController {
 
   async getRoles(req: Request, res: Response, next: NextFunction) {
     try {
-      const result = (await this.userService.getRoles()) as any;
+      const result = (await this.userService.getRoles(req.clientId)) as any;
 
       for (let i = 0; i < result.results.length; i++) {
-        const { entities } = await this.userService.listGrants({
+        const { entities } = await this.userService.listGrants(req.clientId, {
           roleName: result.results[i].role.name,
         });
         result.results[i].entities = entities;
@@ -125,7 +130,9 @@ export class UserController {
   async createRole(req: Request, res: Response, next: NextFunction) {
     const { roleName } = req.body;
     try {
-      const result = await this.userService.createRole({ roleName });
+      const result = await this.userService.createRole(req.clientId, {
+        roleName,
+      });
       res.send(result);
     } catch (error) {
       next(error);
@@ -138,9 +145,13 @@ export class UserController {
 
     try {
       if (force) {
-        await this.userService.revokeAllRolePrivileges({ roleName });
+        await this.userService.revokeAllRolePrivileges(req.clientId, {
+          roleName,
+        });
       }
-      const result = await this.userService.deleteRole({ roleName });
+      const result = await this.userService.deleteRole(req.clientId, {
+        roleName,
+      });
       res.send(result);
     } catch (error) {
       next(error);
@@ -155,7 +166,7 @@ export class UserController {
 
     try {
       // get user existing roles
-      const selectUser = await this.userService.selectUser({
+      const selectUser = await this.userService.selectUser(req.clientId, {
         username,
         includeRoleInfo: false,
       });
@@ -164,7 +175,7 @@ export class UserController {
       // remove user existing roles
       for (let i = 0; i < existingRoles.length; i++) {
         if (existingRoles[i].name.length > 0) {
-          await this.userService.unassignUserRole({
+          await this.userService.unassignUserRole(req.clientId, {
             username,
             roleName: existingRoles[i].name,
           });
@@ -173,7 +184,7 @@ export class UserController {
 
       // assign new user roles
       for (let i = 0; i < roles.length; i++) {
-        const result = await this.userService.assignUserRole({
+        const result = await this.userService.assignUserRole(req.clientId, {
           username,
           roleName: roles[i],
         });
@@ -191,7 +202,7 @@ export class UserController {
     const { username } = req.params;
 
     try {
-      const result = await this.userService.unassignUserRole({
+      const result = await this.userService.unassignUserRole(req.clientId, {
         username,
         roleName,
       });
@@ -213,7 +224,7 @@ export class UserController {
   async listGrant(req: Request, res: Response, next: NextFunction) {
     const { roleName } = req.params;
     try {
-      const result = await this.userService.listGrants({
+      const result = await this.userService.listGrants(req.clientId, {
         roleName,
       });
       res.send(result);
@@ -230,11 +241,16 @@ export class UserController {
 
     try {
       // revoke all
-      await this.userService.revokeAllRolePrivileges({ roleName });
+      await this.userService.revokeAllRolePrivileges(req.clientId, {
+        roleName,
+      });
 
       // assign new user roles
       for (let i = 0; i < privileges.length; i++) {
-        const result = await this.userService.grantRolePrivilege(privileges[i]);
+        const result = await this.userService.grantRolePrivilege(
+          req.clientId,
+          privileges[i]
+        );
         results.push(result);
       }
 

+ 59 - 33
server/src/users/users.service.ts

@@ -19,76 +19,99 @@ import {
   OperateRolePrivilegeReq,
 } from '@zilliz/milvus2-sdk-node';
 import { throwErrorFromSDK } from '../utils/Error';
+import { clientCache } from '../app';
 
 export class UserService {
-  async getUsers() {
-    const res = await MilvusService.activeMilvusClient.listUsers();
+  async getUsers(clientId: string) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.listUsers();
     throwErrorFromSDK(res.status);
 
     return res;
   }
 
-  async createUser(data: CreateUserReq) {
-    const res = await MilvusService.activeMilvusClient.createUser(data);
+  async createUser(clientId: string, data: CreateUserReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.createUser(data);
     throwErrorFromSDK(res);
 
     return res;
   }
 
-  async updateUser(data: UpdateUserReq) {
-    const res = await MilvusService.activeMilvusClient.updateUser(data);
+  async updateUser(clientId: string, data: UpdateUserReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.updateUser(data);
     throwErrorFromSDK(res);
 
     return res;
   }
 
-  async deleteUser(data: DeleteUserReq) {
-    const res = await MilvusService.activeMilvusClient.deleteUser(data);
+  async deleteUser(clientId: string, data: DeleteUserReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.deleteUser(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async getRoles(data?: listRoleReq) {
-    const res = await MilvusService.activeMilvusClient.listRoles(data);
+  async getRoles(clientId: string, data?: listRoleReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.listRoles(data);
     throwErrorFromSDK(res.status);
 
     return res;
   }
 
-  async selectUser(data?: SelectUserReq) {
-    const res = await MilvusService.activeMilvusClient.selectUser(data);
+  async selectUser(clientId: string, data?: SelectUserReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.selectUser(data);
     throwErrorFromSDK(res.status);
 
     return res;
   }
 
-  async createRole(data: CreateRoleReq) {
-    const res = await MilvusService.activeMilvusClient.createRole(data);
+  async createRole(clientId: string, data: CreateRoleReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.createRole(data);
     throwErrorFromSDK(res);
 
     return res;
   }
 
-  async deleteRole(data: DropRoleReq) {
-    const res = await MilvusService.activeMilvusClient.dropRole(data);
+  async deleteRole(clientId: string, data: DropRoleReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.dropRole(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async assignUserRole(data: AddUserToRoleReq) {
-    const res = await MilvusService.activeMilvusClient.addUserToRole(data);
+  async assignUserRole(clientId: string, data: AddUserToRoleReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.addUserToRole(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async unassignUserRole(data: RemoveUserFromRoleReq) {
-    const res = await MilvusService.activeMilvusClient.removeUserFromRole(data);
+  async unassignUserRole(clientId: string, data: RemoveUserFromRoleReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.removeUserFromRole(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async hasRole(data: HasRoleReq) {
-    const res = await MilvusService.activeMilvusClient.hasRole(data);
+  async hasRole(clientId: string, data: HasRoleReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.hasRole(data);
     throwErrorFromSDK(res.status);
     return res;
   }
@@ -103,36 +126,39 @@ export class UserService {
     };
   }
 
-  async listGrants(data: ListGrantsReq) {
-    const res = await MilvusService.activeMilvusClient.listGrants(data);
+  async listGrants(clientId: string, data: ListGrantsReq) {
+        const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.listGrants(data);
     throwErrorFromSDK(res.status);
     return res;
   }
 
-  async grantRolePrivilege(data: OperateRolePrivilegeReq) {
-    const res = await MilvusService.activeMilvusClient.grantRolePrivilege(data);
+  async grantRolePrivilege(clientId: string, data: OperateRolePrivilegeReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.grantRolePrivilege(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async revokeRolePrivilege(data: OperateRolePrivilegeReq) {
-    const res = await MilvusService.activeMilvusClient.revokeRolePrivilege(
-      data
-    );
+  async revokeRolePrivilege(clientId: string, data: OperateRolePrivilegeReq) {
+        const { milvusClient } = clientCache.get(clientId);
+
+    const res = await milvusClient.revokeRolePrivilege(data);
     throwErrorFromSDK(res);
     return res;
   }
 
-  async revokeAllRolePrivileges(data: { roleName: string }) {
+  async revokeAllRolePrivileges(clientId: string, data: { roleName: string }) {
     // get existing privileges
-    const existingPrivileges = await this.listGrants({
+    const existingPrivileges = await this.listGrants(clientId, {
       roleName: data.roleName,
     });
 
     // revoke all
     for (let i = 0; i < existingPrivileges.entities.length; i++) {
       const res = existingPrivileges.entities[i];
-      await this.revokeRolePrivilege({
+      await this.revokeRolePrivilege(clientId, {
         object: res.object.name,
         objectName: res.object_name,
         privilegeName: res.grantor.privilege.name,

+ 4 - 0
server/src/utils/Const.ts

@@ -17,12 +17,16 @@ export enum LOADING_STATE {
 }
 
 export enum WS_EVENTS {
+  TO_SERVER = 'TO_SERVER',
+  TO_CLIENT = 'TO_CLIENT',
   COLLECTION = 'COLLECTION',
 }
 
 export enum WS_EVENTS_TYPE {
   START,
+  DOING,
   STOP,
+  CANCEL,
 }
 
 export const DEFAULT_MILVUS_PORT = 19530;