Browse Source

support multiple clients for websocket server (#371)

* support register client in websocket sever

Signed-off-by: ryjiang <jiangruiyi@gmail.com>

* recover effect

Signed-off-by: ryjiang <jiangruiyi@gmail.com>

---------

Signed-off-by: ryjiang <jiangruiyi@gmail.com>
ryjiang 1 year ago
parent
commit
291650a436

+ 13 - 4
client/src/context/Auth.tsx

@@ -4,6 +4,7 @@ import { AuthContextType } from './Types';
 
 export const authContext = createContext<AuthContextType>({
   isAuth: false,
+  clientId: '',
   address: '',
   username: '',
   isManaged: false,
@@ -11,6 +12,7 @@ export const authContext = createContext<AuthContextType>({
   setAddress: () => {},
   setUsername: () => {},
   setIsAuth: () => {},
+  setClientId: () => {},
 });
 
 const { Provider } = authContext;
@@ -26,6 +28,15 @@ export const AuthProvider = (props: { children: React.ReactNode }) => {
   const [isAuth, setIsAuth] = useState<boolean>(address !== '');
   // const isAuth = useMemo(() => !!address, [address]);
 
+  // get milvus address from local storage
+  const [clientId, setClientId] = useState<string>(
+    window.localStorage.getItem(MILVUS_CLIENT_ID) || ''
+  );
+
+  useEffect(() => {
+    document.title = address ? `${address} - Attu` : 'Attu';
+  }, [address, username]);
+
   const logout = () => {
     // remove user data from local storage
     window.localStorage.removeItem(MILVUS_CLIENT_ID);
@@ -37,19 +48,17 @@ export const AuthProvider = (props: { children: React.ReactNode }) => {
     setIsAuth(false);
   };
 
-  useEffect(() => {
-    document.title = address ? `${address} - Attu` : 'Attu';
-  }, [address, username]);
-
   return (
     <Provider
       value={{
         isAuth,
+        clientId,
         address,
         username,
         setAddress,
         setUsername,
         setIsAuth,
+        setClientId,
         logout,
         isManaged: address.includes('vectordb.zillizcloud.com'),
       }}

+ 2 - 0
client/src/context/Types.ts

@@ -56,6 +56,7 @@ export type OpenSnackBarType = (
 
 export type AuthContextType = {
   isAuth: boolean;
+  clientId: string;
   address: string;
   username: string;
   isManaged: boolean;
@@ -63,6 +64,7 @@ export type AuthContextType = {
   setAddress: Dispatch<SetStateAction<string>>;
   setUsername: Dispatch<SetStateAction<string>>;
   setIsAuth: Dispatch<SetStateAction<boolean>>;
+  setClientId: Dispatch<SetStateAction<string>>;
 };
 
 export type DataContextType = {

+ 5 - 2
client/src/context/WebSocket.tsx

@@ -15,15 +15,18 @@ const { Provider } = webSocketContext;
 
 export const WebSocketProvider = (props: { children: React.ReactNode }) => {
   const [collections, setCollections] = useState<Collection[]>([]);
-  const { isAuth } = useContext(authContext);
+  const { isAuth, clientId } = useContext(authContext);
   const socket = useRef<Socket | null>(null);
 
   useEffect(() => {
     if (isAuth) {
+      // connect to socket server
       socket.current = io(url as string);
+      // register client
+      socket.current.emit(WS_EVENTS.REGISTER, clientId);
 
       socket.current.on('connect', function () {
-        console.log('--- ws connected ---');
+        console.log('--- ws connected ---', clientId);
       });
 
       /**

+ 3 - 1
client/src/pages/connect/AuthForm.tsx

@@ -64,7 +64,8 @@ export const AuthForm = (props: any) => {
   const classes = useStyles();
 
   const { openSnackBar } = useContext(rootContext);
-  const { setAddress, setUsername, setIsAuth } = useContext(authContext);
+  const { setAddress, setUsername, setIsAuth, setClientId } =
+    useContext(authContext);
   const { setDatabase } = useContext(dataContext);
 
   const Logo = icons.zilliz;
@@ -206,6 +207,7 @@ export const AuthForm = (props: any) => {
     const result = await MilvusService.connect(form);
 
     setIsAuth(true);
+    setClientId(result.clientId);
     setAddress(form.address);
     setUsername(form.username);
     setDatabase(result.database);

+ 5 - 7
server/src/crons/crons.service.ts

@@ -1,7 +1,7 @@
 import { schedule, ScheduledTask } from 'node-cron';
 import { CollectionsService } from '../collections/collections.service';
 import { WS_EVENTS, WS_EVENTS_TYPE } from '../utils';
-import { serverEvent } from '../events';
+import { clients } from '../socket';
 
 export class CronsService {
   constructor(
@@ -39,12 +39,10 @@ export class CronsService {
     const task = async () => {
       try {
         const res = await this.collectionService.getAllCollections(clientId);
-        // TODO
-        // this.eventService.server.emit("COLLECTION", res);
-        serverEvent.emit(WS_EVENTS.TO_CLIENT, {
-          event: WS_EVENTS.COLLECTION + '',
-          data: res,
-        });
+        // get current socket
+        const socketClient = clients.get(clientId);
+        // emit event to current client
+        socketClient.emit(WS_EVENTS.COLLECTION, res);
         return res;
       } catch (error) {
         // When user not connect milvus, stop cron

+ 10 - 24
server/src/socket.ts

@@ -2,9 +2,10 @@
 import { Server, Socket } from 'socket.io';
 import * as http from 'http';
 import chalk from 'chalk';
-import { serverEvent } from './events';
 import { WS_EVENTS } from './utils';
+
 export let io: Server;
+export let clients = new Map<string, Socket>();
 
 export function initWebSocket(server: http.Server) {
   io = new Server(server, {
@@ -15,30 +16,15 @@ export function initWebSocket(server: http.Server) {
   });
 
   io.on('connection', (socket: Socket) => {
-    console.info(
-      chalk.green(`ws client connected ${socket.client.conn.remoteAddress}`)
-    );
-
-    socket.on(WS_EVENTS.COLLECTION, (message: any) => {
-      socket.emit(WS_EVENTS.COLLECTION, { data: message });
-    });
-
-    // frontend emit -> serverEvent.emit -> server event handler
-    socket.on(WS_EVENTS.TO_SERVER, (msg: any) => {
-      serverEvent.emit(msg.event, msg);
-    });
-
-    // server emit -> socket emit -> frontend event handler
-    serverEvent.on(WS_EVENTS.TO_CLIENT, (msg: any) => {
-      socket.emit(msg.event, msg.data);
-    });
+    // register client
+    socket.on(WS_EVENTS.REGISTER, (clientId: string) => {
+      clients.set(clientId, socket);
+      console.info(chalk.green(`ws client connected ${clientId}`));
 
-    socket.on('disconnect', () => {
-      console.info(
-        chalk.green(
-          `ws client disconnected ${socket.client.conn.remoteAddress}`
-        )
-      );
+      socket.on('disconnect', () => {
+        console.info(chalk.green(`ws client disconnected ${clientId}`));
+        clients.delete(clientId);
+      });
     });
   });
 }

+ 1 - 2
server/src/utils/Const.ts

@@ -17,8 +17,7 @@ export enum LOADING_STATE {
 }
 
 export enum WS_EVENTS {
-  TO_SERVER = 'TO_SERVER',
-  TO_CLIENT = 'TO_CLIENT',
+  REGISTER = 'REGISTER',
   COLLECTION = 'COLLECTION',
 }