Browse Source

fix: server crash and memory leak after switching between databases (#652)

* fix: server crash and memory leak after switching between databases

Signed-off-by: ryjiang <jiangruiyi@gmail.com>

* WIP

Signed-off-by: ryjiang <jiangruiyi@gmail.com>

* make sure all requests with database

Signed-off-by: shanghaikid <jiangruiyi@gmail.com>

* clean console

Signed-off-by: shanghaikid <jiangruiyi@gmail.com>

* clean console

Signed-off-by: shanghaikid <jiangruiyi@gmail.com>

---------

Signed-off-by: ryjiang <jiangruiyi@gmail.com>
Signed-off-by: shanghaikid <jiangruiyi@gmail.com>
ryjiang 9 months ago
parent
commit
b22cdc485b

+ 63 - 39
client/src/components/layout/GlobalEffect.tsx

@@ -1,54 +1,78 @@
-import React, { useContext } from 'react';
+import React, { useContext, useEffect } from 'react';
 import axiosInstance from '@/http/Axios';
 import axiosInstance from '@/http/Axios';
-import { rootContext, authContext } from '@/context';
+import { rootContext, authContext, dataContext } from '@/context';
 import { HTTP_STATUS_CODE } from '@server/utils/Const';
 import { HTTP_STATUS_CODE } from '@server/utils/Const';
 
 
 let axiosResInterceptor: number | null = null;
 let axiosResInterceptor: number | null = null;
-// let timer: Record<string, ReturnType<typeof setTimeout> | number>[] = [];
-// we only take side effect here, nothing else
-const GlobalEffect = (props: { children: React.ReactNode }) => {
+
+const GlobalEffect = ({ children }: { children: React.ReactNode }) => {
   const { openSnackBar } = useContext(rootContext);
   const { openSnackBar } = useContext(rootContext);
   const { logout } = useContext(authContext);
   const { logout } = useContext(authContext);
+  const { database } = useContext(dataContext);
 
 
-  // catch axios error here
-  if (axiosResInterceptor === null) {
-    axiosResInterceptor = axiosInstance.interceptors.response.use(
-      function (res: any) {
-        if (res.statusCode && res.statusCode !== HTTP_STATUS_CODE.OK) {
-          openSnackBar(res.data.message, 'warning');
-          return Promise.reject(res.data);
-        }
-
-        return res;
+  useEffect(() => {
+    // Add database header to all axios requests
+    const requestInterceptor = axiosInstance.interceptors.request.use(
+      config => {
+        config.headers['x-attu-database'] = database;
+        return config;
       },
       },
-      function (error: any) {
-        const { response = {} } = error;
-
-        switch (response.status) {
-          case HTTP_STATUS_CODE.UNAUTHORIZED:
-          case HTTP_STATUS_CODE.FORBIDDEN:
-            setTimeout(logout, 1000);
-            break;
-          default:
-            break;
-        }
-        if (response.data) {
-          const { message: errMsg } = response.data;
-          // We need check status 401 in login page
-          // So server will return 500 when change the user password.
-          errMsg && openSnackBar(errMsg, 'error');
-          return Promise.reject(error);
-        }
-        if (error.message) {
+      error => Promise.reject(error)
+    );
+
+    // Clean up interceptor on unmount
+    return () => {
+      axiosInstance.interceptors.request.eject(requestInterceptor);
+    };
+  }, [database]);
+
+  useEffect(() => {
+    if (axiosResInterceptor === null) {
+      axiosResInterceptor = axiosInstance.interceptors.response.use(
+        (response: any) => {
+          if (
+            response.statusCode &&
+            response.statusCode !== HTTP_STATUS_CODE.OK
+          ) {
+            openSnackBar(response.data.message, 'warning');
+            return Promise.reject(response.data);
+          }
+          return response;
+        },
+        error => {
+          const { response } = error;
+          if (response) {
+            switch (response.status) {
+              case HTTP_STATUS_CODE.UNAUTHORIZED:
+              case HTTP_STATUS_CODE.FORBIDDEN:
+                setTimeout(() => logout(true), 1000);
+                break;
+              default:
+                break;
+            }
+            const errorMessage = response.data?.message;
+            if (errorMessage) {
+              openSnackBar(errorMessage, 'error');
+              return Promise.reject(error);
+            }
+          }
+          // Handle other error cases
           openSnackBar(error.message, 'error');
           openSnackBar(error.message, 'error');
+          return Promise.reject(error);
         }
         }
-        return Promise.reject(error);
+      );
+    }
+
+    // Clean up response interceptor on unmount
+    return () => {
+      if (axiosResInterceptor !== null) {
+        axiosInstance.interceptors.response.eject(axiosResInterceptor);
+        axiosResInterceptor = null;
       }
       }
-    );
-  }
-  // get global data
+    };
+  }, [logout, openSnackBar]);
 
 
-  return <>{props.children}</>;
+  return <>{children}</>;
 };
 };
 
 
 export default GlobalEffect;
 export default GlobalEffect;

+ 1 - 1
client/src/components/layout/Header.tsx

@@ -106,7 +106,7 @@ const Header: FC = () => {
   };
   };
 
 
   const handleLogout = async () => {
   const handleLogout = async () => {
-    logout();
+    logout(false);
   };
   };
 
 
   const useDatabase = async (database: string) => {
   const useDatabase = async (database: string) => {

+ 5 - 1
client/src/context/Auth.tsx

@@ -69,7 +69,11 @@ export const AuthProvider = (props: { children: React.ReactNode }) => {
     return res;
     return res;
   };
   };
   // logout API
   // logout API
-  const logout = () => {
+  const logout = async (pass?: boolean) => {
+    if (!pass) {
+      // close connetion
+      await MilvusService.closeConnection();
+    }
     // clear client id
     // clear client id
     setClientId('');
     setClientId('');
     // remove client id from local storage
     // remove client id from local storage

+ 20 - 11
client/src/context/Data.tsx

@@ -136,14 +136,23 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
 
 
   // Websocket Callback: update single collection
   // Websocket Callback: update single collection
   const updateCollections = useCallback(
   const updateCollections = useCallback(
-    (updateCollections: CollectionFullObject[]) => {
+    (props: { collections: CollectionFullObject[]; database?: string }) => {
+      const { collections, database: remote } = props;
+      if (
+        remote !== database &&
+        database !== undefined &&
+        remote !== undefined
+      ) {
+        // console.log('database not matched', remote, database);
+        return;
+      }
       // check state to see if it is loading or building index, if so, start server cron job
       // check state to see if it is loading or building index, if so, start server cron job
-      detectLoadingIndexing(updateCollections);
+      detectLoadingIndexing(collections);
       // update single collection
       // update single collection
       setCollections(prev => {
       setCollections(prev => {
         // update exist collection
         // update exist collection
         const newCollections = prev.map(v => {
         const newCollections = prev.map(v => {
-          const collectionToUpdate = updateCollections.find(c => c.id === v.id);
+          const collectionToUpdate = collections.find(c => c.id === v.id);
 
 
           if (collectionToUpdate) {
           if (collectionToUpdate) {
             return collectionToUpdate;
             return collectionToUpdate;
@@ -215,7 +224,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
     const res = await CollectionService.getCollection(name);
     const res = await CollectionService.getCollection(name);
 
 
     // update collection
     // update collection
-    updateCollections([res]);
+    updateCollections({ collections: [res] });
 
 
     return res;
     return res;
   };
   };
@@ -260,7 +269,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
     }
     }
 
 
     // update collection, and trigger cron job
     // update collection, and trigger cron job
-    updateCollections([collection]);
+    updateCollections({ collections: [collection] });
   };
   };
 
 
   // API: release collection
   // API: release collection
@@ -275,7 +284,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
     const newCollection = await CollectionService.renameCollection(name, {
     const newCollection = await CollectionService.renameCollection(name, {
       new_collection_name: newName,
       new_collection_name: newName,
     });
     });
-    updateCollections([newCollection]);
+    updateCollections({ collections: [newCollection] });
 
 
     return newCollection;
     return newCollection;
   };
   };
@@ -307,7 +316,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
     // create index
     // create index
     const newCollection = await CollectionService.createIndex(param);
     const newCollection = await CollectionService.createIndex(param);
     // update collection
     // update collection
-    updateCollections([newCollection]);
+    updateCollections({ collections: [newCollection] });
 
 
     return newCollection;
     return newCollection;
   };
   };
@@ -317,7 +326,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
     // drop index
     // drop index
     const { data } = await CollectionService.dropIndex(params);
     const { data } = await CollectionService.dropIndex(params);
     // update collection
     // update collection
-    updateCollections([data]);
+    updateCollections({ collections: [data] });
 
 
     return data;
     return data;
   };
   };
@@ -329,7 +338,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
       alias,
       alias,
     });
     });
     // update collection
     // update collection
-    updateCollections([newCollection]);
+    updateCollections({ collections: [newCollection] });
 
 
     return newCollection;
     return newCollection;
   };
   };
@@ -342,7 +351,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
     });
     });
 
 
     // update collection
     // update collection
-    updateCollections([data]);
+    updateCollections({ collections: [data] });
 
 
     return data;
     return data;
   };
   };
@@ -359,7 +368,7 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
     });
     });
 
 
     // update existing collection
     // update existing collection
-    updateCollections([newCollection]);
+    updateCollections({ collections: [newCollection] });
 
 
     return newCollection;
     return newCollection;
   };
   };

+ 1 - 1
client/src/context/Types.ts

@@ -70,7 +70,7 @@ export type AuthContextType = {
   clientId: string;
   clientId: string;
   isManaged: boolean;
   isManaged: boolean;
   isAuth: boolean;
   isAuth: boolean;
-  logout: () => void;
+  logout: (pass?: boolean) => void;
   login: (params: AuthReq) => Promise<AuthObject>;
   login: (params: AuthReq) => Promise<AuthObject>;
 };
 };
 
 

+ 1 - 1
server/package.json

@@ -13,7 +13,7 @@
   },
   },
   "dependencies": {
   "dependencies": {
     "@json2csv/plainjs": "^7.0.3",
     "@json2csv/plainjs": "^7.0.3",
-    "@zilliz/milvus2-sdk-node": "^2.4.8",
+    "@zilliz/milvus2-sdk-node": "^2.4.9",
     "axios": "^1.7.4",
     "axios": "^1.7.4",
     "chalk": "4.1.2",
     "chalk": "4.1.2",
     "class-sanitizer": "^1.0.1",
     "class-sanitizer": "^1.0.1",

+ 29 - 6
server/src/collections/collections.controller.ts

@@ -135,8 +135,15 @@ export class CollectionController {
     try {
     try {
       const result =
       const result =
         type === 1
         type === 1
-          ? await this.collectionsService.getLoadedCollections(req.clientId)
-          : await this.collectionsService.getAllCollections(req.clientId);
+          ? await this.collectionsService.getLoadedCollections(
+              req.clientId,
+              req.db_name
+            )
+          : await this.collectionsService.getAllCollections(
+              req.clientId,
+              [],
+              req.db_name
+            );
       res.send(result);
       res.send(result);
     } catch (error) {
     } catch (error) {
       next(error);
       next(error);
@@ -145,7 +152,10 @@ export class CollectionController {
 
 
   async getStatistics(req: Request, res: Response, next: NextFunction) {
   async getStatistics(req: Request, res: Response, next: NextFunction) {
     try {
     try {
-      const result = await this.collectionsService.getStatistics(req.clientId);
+      const result = await this.collectionsService.getStatistics(
+        req.clientId,
+        req.db_name
+      );
       res.send(result);
       res.send(result);
     } catch (error) {
     } catch (error) {
       next(error);
       next(error);
@@ -227,6 +237,7 @@ export class CollectionController {
         req.clientId,
         req.clientId,
         {
         {
           collection_name: name,
           collection_name: name,
+          db_name: req.db_name,
         }
         }
       );
       );
       res.send(result);
       res.send(result);
@@ -240,7 +251,8 @@ export class CollectionController {
     try {
     try {
       const result = await this.collectionsService.getAllCollections(
       const result = await this.collectionsService.getAllCollections(
         req.clientId,
         req.clientId,
-        [name]
+        [name],
+        req.db_name
       );
       );
       res.send(result[0]);
       res.send(result[0]);
     } catch (error) {
     } catch (error) {
@@ -259,6 +271,7 @@ export class CollectionController {
         req.clientId,
         req.clientId,
         {
         {
           collection_name: name,
           collection_name: name,
+          db_name: req.db_name,
         }
         }
       );
       );
       res.send(result);
       res.send(result);
@@ -270,12 +283,12 @@ export class CollectionController {
   async loadCollection(req: Request, res: Response, next: NextFunction) {
   async loadCollection(req: Request, res: Response, next: NextFunction) {
     const collection_name = req.params?.name;
     const collection_name = req.params?.name;
     const data = req.body;
     const data = req.body;
-    const param: LoadCollectionReq = { collection_name };
+    const param: LoadCollectionReq = { collection_name, db_name: req.db_name };
     if (data.replica_number) {
     if (data.replica_number) {
       param.replica_number = Number(data.replica_number);
       param.replica_number = Number(data.replica_number);
     }
     }
     try {
     try {
-      const result = await this.collectionsService.loadCollection(
+      const result = await this.collectionsService.loadCollectionAsync(
         req.clientId,
         req.clientId,
         param
         param
       );
       );
@@ -292,6 +305,7 @@ export class CollectionController {
         req.clientId,
         req.clientId,
         {
         {
           collection_name: name,
           collection_name: name,
+          db_name: req.db_name,
         }
         }
       );
       );
       res.send(result);
       res.send(result);
@@ -415,6 +429,7 @@ export class CollectionController {
         name,
         name,
         {
         {
           alias,
           alias,
+          db_name: req.db_name,
         }
         }
       );
       );
       res.send(result);
       res.send(result);
@@ -442,6 +457,7 @@ export class CollectionController {
         req.clientId,
         req.clientId,
         {
         {
           collectionName: name,
           collectionName: name,
+          dbName: req.db_name,
         }
         }
       );
       );
       res.send(result.infos);
       res.send(result.infos);
@@ -457,6 +473,7 @@ export class CollectionController {
         req.clientId,
         req.clientId,
         {
         {
           collectionName: name,
           collectionName: name,
+          dbName: req.db_name,
         }
         }
       );
       );
       res.send(result.infos);
       res.send(result.infos);
@@ -470,6 +487,7 @@ export class CollectionController {
     try {
     try {
       const result = await this.collectionsService.compact(req.clientId, {
       const result = await this.collectionsService.compact(req.clientId, {
         collection_name: name,
         collection_name: name,
+        db_name: req.db_name,
       });
       });
       res.send(result);
       res.send(result);
     } catch (error) {
     } catch (error) {
@@ -482,6 +500,7 @@ export class CollectionController {
     try {
     try {
       const result = await this.collectionsService.count(req.clientId, {
       const result = await this.collectionsService.count(req.clientId, {
         collection_name: name,
         collection_name: name,
+        db_name: req.db_name,
       });
       });
 
 
       res.send(result);
       res.send(result);
@@ -497,6 +516,7 @@ export class CollectionController {
         req.clientId,
         req.clientId,
         {
         {
           collection_name: name,
           collection_name: name,
+          db_name: req.db_name,
         }
         }
       );
       );
 
 
@@ -517,11 +537,13 @@ export class CollectionController {
               extra_params,
               extra_params,
               field_name,
               field_name,
               index_name,
               index_name,
+              db_name: req.db_name,
             })
             })
           : await this.collectionsService.dropIndex(req.clientId, {
           : await this.collectionsService.dropIndex(req.clientId, {
               collection_name,
               collection_name,
               field_name,
               field_name,
               index_name,
               index_name,
+              db_name: req.db_name,
             });
             });
       res.send(result);
       res.send(result);
     } catch (error) {
     } catch (error) {
@@ -534,6 +556,7 @@ export class CollectionController {
     try {
     try {
       const result = await this.collectionsService.describeIndex(req.clientId, {
       const result = await this.collectionsService.describeIndex(req.clientId, {
         collection_name: data,
         collection_name: data,
+        db_name: req.db_name,
       });
       });
 
 
       res.send(result);
       res.send(result);

+ 90 - 43
server/src/collections/collections.service.ts

@@ -71,9 +71,11 @@ export class CollectionsService {
   async createCollection(clientId: string, data: CreateCollectionReq) {
   async createCollection(clientId: string, data: CreateCollectionReq) {
     const { milvusClient } = clientCache.get(clientId);
     const { milvusClient } = clientCache.get(clientId);
     const res = await milvusClient.createCollection(data);
     const res = await milvusClient.createCollection(data);
-    const newCollection = (await this.getAllCollections(clientId, [
-      data.collection_name,
-    ])) as CollectionFullObject[];
+    const newCollection = (await this.getAllCollections(
+      clientId,
+      [data.collection_name],
+      data.db_name
+    )) as CollectionFullObject[];
 
 
     throwErrorFromSDK(res);
     throwErrorFromSDK(res);
     return newCollection[0];
     return newCollection[0];
@@ -159,9 +161,11 @@ export class CollectionsService {
     const res = await milvusClient.renameCollection(data);
     const res = await milvusClient.renameCollection(data);
     throwErrorFromSDK(res);
     throwErrorFromSDK(res);
 
 
-    const newCollection = (await this.getAllCollections(clientId, [
-      data.new_collection_name,
-    ])) as CollectionFullObject[];
+    const newCollection = (await this.getAllCollections(
+      clientId,
+      [data.new_collection_name],
+      data.db_name
+    )) as CollectionFullObject[];
 
 
     return newCollection[0];
     return newCollection[0];
   }
   }
@@ -171,9 +175,11 @@ export class CollectionsService {
     const res = await milvusClient.alterCollection(data);
     const res = await milvusClient.alterCollection(data);
     throwErrorFromSDK(res);
     throwErrorFromSDK(res);
 
 
-    const newCollection = (await this.getAllCollections(clientId, [
-      data.collection_name,
-    ])) as CollectionFullObject[];
+    const newCollection = (await this.getAllCollections(
+      clientId,
+      [data.collection_name],
+      data.db_name
+    )) as CollectionFullObject[];
 
 
     return newCollection[0];
     return newCollection[0];
   }
   }
@@ -193,13 +199,25 @@ export class CollectionsService {
     return data.collection_name;
     return data.collection_name;
   }
   }
 
 
+  async loadCollectionAsync(clientId: string, data: LoadCollectionReq) {
+    const { milvusClient } = clientCache.get(clientId);
+    const res = await milvusClient.loadCollectionAsync(data);
+    throwErrorFromSDK(res);
+
+    return data.collection_name;
+  }
+
   async releaseCollection(clientId: string, data: ReleaseLoadCollectionReq) {
   async releaseCollection(clientId: string, data: ReleaseLoadCollectionReq) {
     const { milvusClient } = clientCache.get(clientId);
     const { milvusClient } = clientCache.get(clientId);
     const res = await milvusClient.releaseCollection(data);
     const res = await milvusClient.releaseCollection(data);
     throwErrorFromSDK(res);
     throwErrorFromSDK(res);
 
 
     // emit update to client
     // emit update to client
-    this.updateCollectionsDetails(clientId, [data.collection_name]);
+    this.updateCollectionsDetails(
+      clientId,
+      [data.collection_name],
+      data.db_name
+    );
 
 
     return data.collection_name;
     return data.collection_name;
   }
   }
@@ -251,7 +269,6 @@ export class CollectionsService {
     return res;
     return res;
   }
   }
 
 
-
   async deleteEntities(clientId: string, data: DeleteEntitiesReq) {
   async deleteEntities(clientId: string, data: DeleteEntitiesReq) {
     const { milvusClient } = clientCache.get(clientId);
     const { milvusClient } = clientCache.get(clientId);
     const res = await milvusClient.deleteEntities(data);
     const res = await milvusClient.deleteEntities(data);
@@ -298,9 +315,11 @@ export class CollectionsService {
     const res = await milvusClient.createAlias(data);
     const res = await milvusClient.createAlias(data);
     throwErrorFromSDK(res);
     throwErrorFromSDK(res);
 
 
-    const newCollection = (await this.getAllCollections(clientId, [
-      data.collection_name,
-    ])) as CollectionFullObject[];
+    const newCollection = (await this.getAllCollections(
+      clientId,
+      [data.collection_name],
+      data.db_name
+    )) as CollectionFullObject[];
 
 
     return newCollection[0];
     return newCollection[0];
   }
   }
@@ -312,23 +331,21 @@ export class CollectionsService {
     return res;
     return res;
   }
   }
 
 
-  async dropAlias(
-    clientId: string,
-    collection_name: string,
-    data: DropAliasReq
-  ) {
+  async dropAlias(clientId: string, collection_name: string, data: any) {
     const { milvusClient } = clientCache.get(clientId);
     const { milvusClient } = clientCache.get(clientId);
     const res = await milvusClient.dropAlias(data);
     const res = await milvusClient.dropAlias(data);
     throwErrorFromSDK(res);
     throwErrorFromSDK(res);
 
 
-    const newCollection = (await this.getAllCollections(clientId, [
-      collection_name,
-    ])) as CollectionFullObject[];
+    const newCollection = (await this.getAllCollections(
+      clientId,
+      [collection_name],
+      data.db_name
+    )) as CollectionFullObject[];
 
 
     return newCollection[0];
     return newCollection[0];
   }
   }
 
 
-  async getReplicas(clientId: string, data: GetReplicasDto) {
+  async getReplicas(clientId: string, data: any) {
     const { milvusClient } = clientCache.get(clientId);
     const { milvusClient } = clientCache.get(clientId);
     const res = await milvusClient.getReplicas(data);
     const res = await milvusClient.getReplicas(data);
     return res;
     return res;
@@ -356,7 +373,8 @@ export class CollectionsService {
     clientId: string,
     clientId: string,
     collection: CollectionData,
     collection: CollectionData,
     loadCollection: CollectionData,
     loadCollection: CollectionData,
-    lazy: boolean = false
+    lazy: boolean = false,
+    database?: string
   ) {
   ) {
     const { collectionsQueue } = clientCache.get(clientId);
     const { collectionsQueue } = clientCache.get(clientId);
     if (lazy) {
     if (lazy) {
@@ -382,6 +400,7 @@ export class CollectionsService {
     // get collection schema and properties
     // get collection schema and properties
     const collectionInfo = await this.describeCollection(clientId, {
     const collectionInfo = await this.describeCollection(clientId, {
       collection_name: collection.name,
       collection_name: collection.name,
+      db_name: database,
     });
     });
 
 
     // get collection statistic data
     // get collection statistic data
@@ -390,6 +409,7 @@ export class CollectionsService {
     try {
     try {
       const res = await this.count(clientId, {
       const res = await this.count(clientId, {
         collection_name: collection.name,
         collection_name: collection.name,
+        db_name: database,
       });
       });
       count = res.rowCount;
       count = res.rowCount;
     } catch (e) {
     } catch (e) {
@@ -407,6 +427,7 @@ export class CollectionsService {
       replicas = loadCollection
       replicas = loadCollection
         ? await this.getReplicas(clientId, {
         ? await this.getReplicas(clientId, {
             collectionID: collectionInfo.collectionID,
             collectionID: collectionInfo.collectionID,
+            db_name: database,
           })
           })
         : replicas;
         : replicas;
     } catch (e) {
     } catch (e) {
@@ -446,7 +467,8 @@ export class CollectionsService {
   // get all collections details
   // get all collections details
   async getAllCollections(
   async getAllCollections(
     clientId: string,
     clientId: string,
-    collections: string[] = []
+    collections: string[] = [],
+    database?: string
   ): Promise<CollectionObject[]> {
   ): Promise<CollectionObject[]> {
     const currentClient = clientCache.get(clientId);
     const currentClient = clientCache.get(clientId);
 
 
@@ -457,10 +479,13 @@ export class CollectionsService {
     }
     }
 
 
     // get all collections(name, timestamp, id)
     // get all collections(name, timestamp, id)
-    const allCollections = await this.showCollections(clientId);
+    const allCollections = await this.showCollections(clientId, {
+      db_name: database,
+    });
     // get all loaded collection
     // get all loaded collection
     const loadedCollections = await this.showCollections(clientId, {
     const loadedCollections = await this.showCollections(clientId, {
       type: ShowCollectionsType.Loaded,
       type: ShowCollectionsType.Loaded,
+      db_name: database,
     });
     });
 
 
     // data container
     // data container
@@ -491,7 +516,8 @@ export class CollectionsService {
           clientId,
           clientId,
           collection,
           collection,
           loadedCollection,
           loadedCollection,
-          !notLazy
+          !notLazy,
+          database
         )
         )
       );
       );
     }
     }
@@ -504,7 +530,11 @@ export class CollectionsService {
           if (q.isObseleted) {
           if (q.isObseleted) {
             return;
             return;
           }
           }
-          await this.updateCollectionsDetails(clientId, collectionsToGet);
+          await this.updateCollectionsDetails(
+            clientId,
+            collectionsToGet,
+            database
+          );
         },
         },
         5
         5
       );
       );
@@ -516,32 +546,37 @@ export class CollectionsService {
 
 
   // update collections details
   // update collections details
   // send new info to the client
   // send new info to the client
-  async updateCollectionsDetails(clientId: string, collections: string[]) {
+  async updateCollectionsDetails(
+    clientId: string,
+    collections: string[],
+    database: string
+  ) {
     try {
     try {
       // get current socket
       // get current socket
       const socketClient = clients.get(clientId);
       const socketClient = clients.get(clientId);
       // get collections
       // get collections
-      const res = await this.getAllCollections(clientId, collections);
+      const res = await this.getAllCollections(clientId, collections, database);
 
 
       // emit event to current client
       // emit event to current client
       if (socketClient) {
       if (socketClient) {
-        socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, res);
+        socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, { collections: res });
       }
       }
     } catch (e) {
     } catch (e) {
       console.log('ignore queue error');
       console.log('ignore queue error');
     }
     }
   }
   }
 
 
-  async getLoadedCollections(clientId: string) {
+  async getLoadedCollections(clientId: string, db_name?: string) {
     const data = [];
     const data = [];
     const res = await this.showCollections(clientId, {
     const res = await this.showCollections(clientId, {
       type: ShowCollectionsType.Loaded,
       type: ShowCollectionsType.Loaded,
+      db_name,
     });
     });
     if (res.data.length > 0) {
     if (res.data.length > 0) {
       for (const item of res.data) {
       for (const item of res.data) {
         const { id, name } = item;
         const { id, name } = item;
 
 
-        const count = this.count(clientId, { collection_name: name });
+        const count = this.count(clientId, { collection_name: name, db_name });
         data.push({
         data.push({
           id,
           id,
           collection_name: name,
           collection_name: name,
@@ -557,12 +592,12 @@ export class CollectionsService {
    * Get collections statistics data
    * Get collections statistics data
    * @returns {collectionCount:number, totalData:number}
    * @returns {collectionCount:number, totalData:number}
    */
    */
-  async getStatistics(clientId: string) {
+  async getStatistics(clientId: string, db_name?: string) {
     const data = {
     const data = {
       collectionCount: 0,
       collectionCount: 0,
       totalData: 0,
       totalData: 0,
     } as StatisticsObject;
     } as StatisticsObject;
-    const res = await this.showCollections(clientId);
+    const res = await this.showCollections(clientId, { db_name });
     data.collectionCount = res.data.length;
     data.collectionCount = res.data.length;
     if (res.data.length > 0) {
     if (res.data.length > 0) {
       for (const item of res.data) {
       for (const item of res.data) {
@@ -570,6 +605,7 @@ export class CollectionsService {
           clientId,
           clientId,
           {
           {
             collection_name: item.name,
             collection_name: item.name,
+            db_name,
           }
           }
         );
         );
         const rowCount = findKeyValue(collectionStatistics.stats, ROW_COUNT);
         const rowCount = findKeyValue(collectionStatistics.stats, ROW_COUNT);
@@ -584,10 +620,11 @@ export class CollectionsService {
    */
    */
   async importSample(
   async importSample(
     clientId: string,
     clientId: string,
-    { collection_name, size, download, format }: ImportSampleDto
+    { collection_name, size, download, format, db_name }: ImportSampleDto
   ) {
   ) {
     const collectionInfo = await this.describeCollection(clientId, {
     const collectionInfo = await this.describeCollection(clientId, {
       collection_name,
       collection_name,
+      db_name,
     });
     });
     const fields_data = genRows(
     const fields_data = genRows(
       collectionInfo.schema.fields,
       collectionInfo.schema.fields,
@@ -605,7 +642,11 @@ export class CollectionsService {
       return { sampleFile };
       return { sampleFile };
     } else {
     } else {
       // Otherwise, insert the data into the collection
       // Otherwise, insert the data into the collection
-      return await this.insert(clientId, { collection_name, fields_data });
+      return await this.insert(clientId, {
+        collection_name,
+        fields_data,
+        db_name,
+      });
     }
     }
   }
   }
 
 
@@ -650,6 +691,7 @@ export class CollectionsService {
   async duplicateCollection(clientId: string, data: RenameCollectionReq) {
   async duplicateCollection(clientId: string, data: RenameCollectionReq) {
     const collection = await this.describeCollection(clientId, {
     const collection = await this.describeCollection(clientId, {
       collection_name: data.collection_name,
       collection_name: data.collection_name,
+      db_name: data.db_name,
     });
     });
 
 
     const createCollectionParams: CreateCollectionReq = {
     const createCollectionParams: CreateCollectionReq = {
@@ -678,6 +720,7 @@ export class CollectionsService {
       collection_name: data.collection_name,
       collection_name: data.collection_name,
       filter:
       filter:
         pkType === 'Int64' ? `${pkField} >= ${MIN_INT64}` : `${pkField} != ''`,
         pkType === 'Int64' ? `${pkField} >= ${MIN_INT64}` : `${pkField} != ''`,
+      db_name: data.db_name,
     });
     });
 
 
     return res;
     return res;
@@ -692,9 +735,11 @@ export class CollectionsService {
     indexCache.delete(key);
     indexCache.delete(key);
 
 
     // fetch new collections
     // fetch new collections
-    const newCollection = (await this.getAllCollections(clientId, [
-      data.collection_name,
-    ])) as CollectionFullObject[];
+    const newCollection = (await this.getAllCollections(
+      clientId,
+      [data.collection_name],
+      data.db_name
+    )) as CollectionFullObject[];
 
 
     throwErrorFromSDK(res);
     throwErrorFromSDK(res);
     return newCollection[0];
     return newCollection[0];
@@ -760,9 +805,11 @@ export class CollectionsService {
     // clear cache;
     // clear cache;
     indexCache.delete(key);
     indexCache.delete(key);
     // fetch new collections
     // fetch new collections
-    const newCollection = (await this.getAllCollections(clientId, [
-      data.collection_name,
-    ])) as CollectionFullObject[];
+    const newCollection = (await this.getAllCollections(
+      clientId,
+      [data.collection_name],
+      data.db_name
+    )) as CollectionFullObject[];
 
 
     return newCollection[0];
     return newCollection[0];
   }
   }

+ 4 - 0
server/src/collections/dto.ts

@@ -70,6 +70,10 @@ export class ImportSampleDto {
   @IsString()
   @IsString()
   @IsOptional()
   @IsOptional()
   readonly format?: string;
   readonly format?: string;
+
+  @IsString()
+  @IsOptional()
+  readonly db_name?: string;
 }
 }
 
 
 export class GetReplicasDto {
 export class GetReplicasDto {

+ 6 - 1
server/src/crons/crons.controller.ts

@@ -30,7 +30,7 @@ export class CronsController {
 
 
   async toggleCronJobByName(req: Request, res: Response, next: NextFunction) {
   async toggleCronJobByName(req: Request, res: Response, next: NextFunction) {
     const cronData = req.body;
     const cronData = req.body;
-    // console.log(cronData, milvusAddress);
+
     try {
     try {
       const result = await this.cronsService.toggleCronJobByName(req.clientId, {
       const result = await this.cronsService.toggleCronJobByName(req.clientId, {
         ...cronData,
         ...cronData,
@@ -40,4 +40,9 @@ export class CronsController {
       next(error);
       next(error);
     }
     }
   }
   }
+
+  deleteCronJob(clientId: string) {
+    console.info(`Deleting all cron jobs for client: ${clientId}`);
+    this.schedulerRegistry.deleteAllCronJobs(clientId);
+  }
 }
 }

+ 62 - 17
server/src/crons/crons.service.ts

@@ -14,6 +14,7 @@ interface CronJob {
   task: ScheduledTask;
   task: ScheduledTask;
   data: CronJobObject;
   data: CronJobObject;
 }
 }
+import { clientCache } from '../app';
 
 
 const getId = (clientId: string, data: CronJobObject) => {
 const getId = (clientId: string, data: CronJobObject) => {
   return `${clientId}/${data.name}/${
   return `${clientId}/${data.name}/${
@@ -53,26 +54,47 @@ export class CronsService {
   }
   }
 
 
   async execCollectionUpdateTask(clientId: string, data: CronJobObject) {
   async execCollectionUpdateTask(clientId: string, data: CronJobObject) {
+    console.log('execCollectionUpdateTask', clientId, data);
     const task = async () => {
     const task = async () => {
       const currentJob: CronJob = this.schedulerRegistry.getCronJob(
       const currentJob: CronJob = this.schedulerRegistry.getCronJob(
         clientId,
         clientId,
         data
         data
       );
       );
 
 
+      // if currentJob is not exist
       if (!currentJob) {
       if (!currentJob) {
+        // if client not connected, stop cron
+        this.schedulerRegistry.deleteCronJob(clientId, data);
         return;
         return;
       }
       }
+
       try {
       try {
+        // get client cache data
+        const { milvusClient } = clientCache.get(clientId);
+        const currentDatabase = (milvusClient as any).metadata.get('dbname');
+
+        // if database is not matched, return
+        if (currentDatabase !== data.payload.database) {
+          // if client not connected, stop cron
+          this.schedulerRegistry.deleteCronJob(clientId, data);
+          console.info('Database is not matched, stop cron.', clientId);
+          return;
+        }
+
         const collections = await this.collectionService.getAllCollections(
         const collections = await this.collectionService.getAllCollections(
           currentJob.clientId,
           currentJob.clientId,
-          currentJob.data.payload.collections
+          currentJob.data.payload.collections,
+          currentJob.data.payload.database
         );
         );
         // get current socket
         // get current socket
         const socketClient = clients.get(currentJob.clientId);
         const socketClient = clients.get(currentJob.clientId);
 
 
         if (socketClient) {
         if (socketClient) {
           // emit event to current client, loading and indexing events are indetified as collection update
           // emit event to current client, loading and indexing events are indetified as collection update
-          socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, collections);
+          socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, {
+            collections,
+            database: currentJob.data.payload.database,
+          });
 
 
           // if all collections are loaded, stop cron
           // if all collections are loaded, stop cron
           const LoadingOrBuildingCollections = collections.filter(v => {
           const LoadingOrBuildingCollections = collections.filter(v => {
@@ -87,6 +109,15 @@ export class CronsService {
           }
           }
         }
         }
       } catch (error) {
       } catch (error) {
+        if (error.message.includes('pool is draining')) {
+          // Handle the pool draining error, possibly by logging and avoiding retry
+          console.error(
+            'The pool is shutting down and cannot accept new work.'
+          );
+          this.schedulerRegistry.deleteCronJob(clientId, data);
+          return;
+        }
+
         // When user not connect milvus, stop cron
         // When user not connect milvus, stop cron
         this.schedulerRegistry.deleteCronJob(clientId, data);
         this.schedulerRegistry.deleteCronJob(clientId, data);
 
 
@@ -115,6 +146,17 @@ export class SchedulerRegistry {
       this.cronJobMap.get(targetId)?.task?.stop();
       this.cronJobMap.get(targetId)?.task?.stop();
       this.cronJobMap.delete(targetId);
       this.cronJobMap.delete(targetId);
     }
     }
+
+  }
+
+  deleteAllCronJobs(clientId: string) {
+    console.log('Deleting all cron jobs in service for client:', clientId);
+    this.cronJobMap.forEach((v, k) => {
+      if (v.clientId === clientId) {
+        v.task.stop();
+        this.cronJobMap.delete(k);
+      }
+    });
   }
   }
 
 
   // ┌────────────── second (optional)
   // ┌────────────── second (optional)
@@ -141,21 +183,24 @@ export class SchedulerRegistry {
       // create task id
       // create task id
       const id = getId(clientId, data);
       const id = getId(clientId, data);
 
 
-      // create task
-      const task = schedule(cronExpression, () => {
-        console.log(
-          `[cronExpression:${cronExpression}] ${data.name} ${id}: running a task.`
-        );
-        func();
-      });
-
-      // save task
-      this.cronJobMap.set(id, {
-        id,
-        clientId,
-        task,
-        data,
-      });
+      if (!this.cronJobMap.has(id)) {
+        console.log('create task:', id);
+        // create task
+        const task = schedule(cronExpression, () => {
+          console.log(
+            `[cronExpression:${cronExpression}] ${data.name} ${id}: running a task.`
+          );
+          func();
+        });
+
+        // save task
+        this.cronJobMap.set(id, {
+          id,
+          clientId,
+          task,
+          data,
+        });
+      }
     }
     }
   }
   }
 }
 }

+ 2 - 2
server/src/crons/index.ts

@@ -1,6 +1,6 @@
-import { CronsController } from "./crons.controller";
+import { CronsController } from './crons.controller';
 
 
 const cronsManager = new CronsController();
 const cronsManager = new CronsController();
 const router = cronsManager.generateRoutes();
 const router = cronsManager.generateRoutes();
 
 
-export { router };
+export { router, cronsManager };

+ 7 - 0
server/src/middleware/index.ts

@@ -10,6 +10,7 @@ declare global {
   namespace Express {
   namespace Express {
     interface Request {
     interface Request {
       clientId?: string;
       clientId?: string;
+      db_name?: string;
     }
     }
   }
   }
 }
 }
@@ -24,6 +25,12 @@ export const ReqHeaderMiddleware = (
   const milvusClientId = (req.headers[MILVUS_CLIENT_ID] as string) || '';
   const milvusClientId = (req.headers[MILVUS_CLIENT_ID] as string) || '';
   req.clientId = req.headers[MILVUS_CLIENT_ID] as string;
   req.clientId = req.headers[MILVUS_CLIENT_ID] as string;
 
 
+  // merge database from header
+  const database = req.headers['x-attu-database'] as string;
+  if (database) {
+    req.db_name = database;
+  }
+
   const bypassURLs = [`/api/v1/milvus/connect`, `/api/v1/milvus/version`];
   const bypassURLs = [`/api/v1/milvus/connect`, `/api/v1/milvus/version`];
 
 
   if (
   if (

+ 20 - 9
server/src/milvus/milvus.service.ts

@@ -9,6 +9,7 @@ import { LRUCache } from 'lru-cache';
 import { DEFAULT_MILVUS_PORT, INDEX_TTL, SimpleQueue } from '../utils';
 import { DEFAULT_MILVUS_PORT, INDEX_TTL, SimpleQueue } from '../utils';
 import { clientCache } from '../app';
 import { clientCache } from '../app';
 import { DescribeIndexRes, AuthReq, AuthObject } from '../types';
 import { DescribeIndexRes, AuthReq, AuthObject } from '../types';
+import { cronsManager } from '../crons';
 
 
 export class MilvusService {
 export class MilvusService {
   private DEFAULT_DATABASE = 'default';
   private DEFAULT_DATABASE = 'default';
@@ -74,6 +75,7 @@ export class MilvusService {
 
 
       // If the server is not healthy, throw an error
       // If the server is not healthy, throw an error
       if (!res.isHealthy) {
       if (!res.isHealthy) {
+        clientCache.delete(milvusClient.clientId);
         throw new Error('Milvus is not ready yet.');
         throw new Error('Milvus is not ready yet.');
       }
       }
 
 
@@ -110,8 +112,6 @@ export class MilvusService {
         database: db,
         database: db,
       };
       };
     } catch (error) {
     } catch (error) {
-      // If any error occurs, clear the cache and throw the error
-      clientCache.dump();
       throw error;
       throw error;
     }
     }
   }
   }
@@ -138,6 +138,8 @@ export class MilvusService {
     const res = await milvusClient.closeConnection();
     const res = await milvusClient.closeConnection();
     // clear cache on disconnect
     // clear cache on disconnect
     clientCache.delete(milvusClient.clientId);
     clientCache.delete(milvusClient.clientId);
+    // clear crons
+    cronsManager.deleteCronJob(clientId);
 
 
     return res;
     return res;
   }
   }
@@ -145,13 +147,22 @@ export class MilvusService {
   async useDatabase(clientId: string, db: string) {
   async useDatabase(clientId: string, db: string) {
     const { milvusClient } = clientCache.get(clientId);
     const { milvusClient } = clientCache.get(clientId);
 
 
-    const res = milvusClient.use({
-      db_name: db,
-    });
-
-    // update the database in the cache
+    // get the database from the cache
     const cache = clientCache.get(clientId);
     const cache = clientCache.get(clientId);
-    cache.database = db;
-    return res;
+    const currentDatabase = cache.database;
+
+    if (currentDatabase !== db) {
+      // use the database
+      const res = milvusClient.use({
+        db_name: db,
+      });
+
+      // clear crons
+      cronsManager.deleteCronJob(clientId);
+
+      // update the database in the cache
+      cache.database = db;
+      return res;
+    }
   }
   }
 }
 }

+ 1 - 0
server/src/partitions/partitions.controller.ts

@@ -43,6 +43,7 @@ export class PartitionController {
         req.clientId,
         req.clientId,
         {
         {
           collection_name: collectionName,
           collection_name: collectionName,
+          db_name: req.db_name,
         }
         }
       );
       );
       res.send(result);
       res.send(result);

+ 4 - 4
server/yarn.lock

@@ -1397,10 +1397,10 @@
   resolved "https://registry.yarnpkg.com/@xmldom/xmldom/-/xmldom-0.8.9.tgz#b6ef7457e826be8049667ae673eda7876eb049be"
   resolved "https://registry.yarnpkg.com/@xmldom/xmldom/-/xmldom-0.8.9.tgz#b6ef7457e826be8049667ae673eda7876eb049be"
   integrity sha512-4VSbbcMoxc4KLjb1gs96SRmi7w4h1SF+fCoiK0XaQX62buCc1G5d0DC5bJ9xJBNPDSVCmIrcl8BiYxzjrqaaJA==
   integrity sha512-4VSbbcMoxc4KLjb1gs96SRmi7w4h1SF+fCoiK0XaQX62buCc1G5d0DC5bJ9xJBNPDSVCmIrcl8BiYxzjrqaaJA==
 
 
-"@zilliz/milvus2-sdk-node@^2.4.8":
-  version "2.4.8"
-  resolved "https://registry.yarnpkg.com/@zilliz/milvus2-sdk-node/-/milvus2-sdk-node-2.4.8.tgz#806437620707e7ca5d17f4355b58a44090e76177"
-  integrity sha512-mLwe1Q8YX/gNHT2tK9v3GB9hPlcpdQwnabdsu/txSp+KI/wv5ZCP098NX0dGcR62TGuhx6htA4AVj6Y9OuMIoA==
+"@zilliz/milvus2-sdk-node@^2.4.9":
+  version "2.4.9"
+  resolved "https://registry.yarnpkg.com/@zilliz/milvus2-sdk-node/-/milvus2-sdk-node-2.4.9.tgz#760f78b44a720426aad775746254d23eb4e7dfc8"
+  integrity sha512-EuCwEE5jENkxk7JVCNxLjtw4BgxcucquzIOalqxhOBQ10P28wU+jFU1ETJpAen4g5FbNwlMCO9rwDaORKjlwSA==
   dependencies:
   dependencies:
     "@grpc/grpc-js" "^1.8.22"
     "@grpc/grpc-js" "^1.8.22"
     "@grpc/proto-loader" "^0.7.10"
     "@grpc/proto-loader" "^0.7.10"