Browse Source

support mutiple client in different browsers

Signed-off-by: nameczz <zizhao.chen@zilliz.com>
nameczz 3 years ago
parent
commit
d5aebcb100

+ 3 - 2
client/src/http/Axios.ts

@@ -1,4 +1,5 @@
 import axios from 'axios';
 import axios from 'axios';
+import { MILVUS_ADDRESS } from '../consts/Localstorage';
 // import { SESSION } from '../consts/Localstorage';
 // import { SESSION } from '../consts/Localstorage';
 
 
 // console.log(process.env.NODE_ENV, 'api:', process.env.REACT_APP_BASE_URL);
 // console.log(process.env.NODE_ENV, 'api:', process.env.REACT_APP_BASE_URL);
@@ -16,9 +17,9 @@ const axiosInstance = axios.create({
 axiosInstance.interceptors.request.use(
 axiosInstance.interceptors.request.use(
   function (config) {
   function (config) {
     // Do something before request is sent
     // Do something before request is sent
-    // const session = window.localStorage.getItem(SESSION);
+    const address = window.localStorage.getItem(MILVUS_ADDRESS);
 
 
-    // session && (config.headers[SESSION] = session);
+    address && (config.headers[MILVUS_ADDRESS] = address);
 
 
     return config;
     return config;
   },
   },

+ 1 - 0
client/src/http/Milvus.ts

@@ -3,6 +3,7 @@ import BaseModel from './BaseModel';
 
 
 export class MilvusHttp extends BaseModel {
 export class MilvusHttp extends BaseModel {
   static CONNECT_URL = '/milvus/connect';
   static CONNECT_URL = '/milvus/connect';
+
   static CHECK_URL = '/milvus/check';
   static CHECK_URL = '/milvus/check';
   static FLUSH_URL = '/milvus/flush';
   static FLUSH_URL = '/milvus/flush';
   static METRICS_URL = '/milvus/metrics';
   static METRICS_URL = '/milvus/metrics';

+ 0 - 5
express/src/__tests__/milvus/milvus.service.test.ts

@@ -22,13 +22,8 @@ describe('Test Milvus service', () => {
   });
   });
 
 
   test('test connectMilvus method', async () => {
   test('test connectMilvus method', async () => {
-    expect(service.milvusClientGetter).toBeUndefined();
-    expect(service.milvusAddressGetter).toBe('');
-
     const res = await service.connectMilvus(mockAddress);
     const res = await service.connectMilvus(mockAddress);
     expect(res.address).toBe(mockAddress);
     expect(res.address).toBe(mockAddress);
-    expect(service.milvusAddressGetter).toBe(mockAddress);
-    expect(service.milvusClientGetter).toBeDefined();
   });
   });
 
 
   test('test connectMilvus method error', async () => {
   test('test connectMilvus method error', async () => {

+ 46 - 42
express/src/app.ts

@@ -1,29 +1,30 @@
-import express from "express";
-import cors from "cors";
-import helmet from "helmet";
-import * as http from "http";
-import { Server, Socket } from "socket.io";
-import { router as connectRouter } from "./milvus";
-import { router as collectionsRouter } from "./collections";
-import { router as partitionsRouter } from "./partitions";
-import { router as schemaRouter } from "./schema";
-import { router as cronsRouter } from "./crons";
-import { pubSub } from "./events";
+import express from 'express';
+import cors from 'cors';
+import helmet from 'helmet';
+import * as http from 'http';
+import { Server, Socket } from 'socket.io';
+import { router as connectRouter } from './milvus';
+import { router as collectionsRouter } from './collections';
+import { router as partitionsRouter } from './partitions';
+import { router as schemaRouter } from './schema';
+import { router as cronsRouter } from './crons';
+import { pubSub } from './events';
 import {
 import {
   TransformResMiddlerware,
   TransformResMiddlerware,
   LoggingMiddleware,
   LoggingMiddleware,
   ErrorMiddleware,
   ErrorMiddleware,
-} from "./middlewares";
+  ReqHeaderMiddleware,
+} from './middlewares';
 
 
-import { getDirectories, getDirectoriesSync, generateCfgs } from "./utils";
-import * as path from "path";
-import chalk from "chalk";
-import { surveSwaggerSpecification } from "./swagger";
-import swaggerUi from "swagger-ui-express";
+import { getDirectories, getDirectoriesSync, generateCfgs } from './utils';
+import * as path from 'path';
+import chalk from 'chalk';
+import { surveSwaggerSpecification } from './swagger';
+import swaggerUi from 'swagger-ui-express';
 
 
 const PLUGIN_DEV = process.env?.PLUGIN_DEV;
 const PLUGIN_DEV = process.env?.PLUGIN_DEV;
-const SRC_PLUGIN_DIR = "src/plugins";
-const DEV_PLUGIN_DIR = "../../src/*/server";
+const SRC_PLUGIN_DIR = 'src/plugins';
+const DEV_PLUGIN_DIR = '../../src/*/server';
 
 
 export const app = express();
 export const app = express();
 const PORT = 3000;
 const PORT = 3000;
@@ -32,8 +33,8 @@ const server = http.createServer(app);
 // initialize the WebSocket server instance
 // initialize the WebSocket server instance
 const io = new Server(server, {
 const io = new Server(server, {
   cors: {
   cors: {
-    origin: "*",
-    methods: ["GET", "POST"],
+    origin: '*',
+    methods: ['GET', 'POST'],
   },
   },
 });
 });
 
 
@@ -45,22 +46,25 @@ app.use(
     contentSecurityPolicy: false,
     contentSecurityPolicy: false,
   })
   })
 );
 );
-app.use(express.json({ limit: "150MB" }));
+app.use(express.json({ limit: '150MB' }));
 // TransformResInterceptor
 // TransformResInterceptor
 app.use(TransformResMiddlerware);
 app.use(TransformResMiddlerware);
 // LoggingInterceptor
 // LoggingInterceptor
 app.use(LoggingMiddleware);
 app.use(LoggingMiddleware);
 
 
+// All headers operations
+app.use(ReqHeaderMiddleware);
+
 const router = express.Router();
 const router = express.Router();
 const pluginsRouter = express.Router();
 const pluginsRouter = express.Router();
 
 
 // Init WebSocket server event listener
 // Init WebSocket server event listener
-io.on("connection", (socket: Socket) => {
-  console.log("socket.io connected");
-  socket.on("COLLECTION", (message: any) => {
-    socket.emit("COLLECTION", { data: message });
+io.on('connection', (socket: Socket) => {
+  console.log('socket.io connected');
+  socket.on('COLLECTION', (message: any) => {
+    socket.emit('COLLECTION', { data: message });
   });
   });
-  pubSub.on("ws_pubsub", (msg) => {
+  pubSub.on('ws_pubsub', (msg) => {
     const { event, data } = msg;
     const { event, data } = msg;
     socket.emit(event, data);
     socket.emit(event, data);
   });
   });
@@ -71,7 +75,7 @@ io.on("connection", (socket: Socket) => {
 getDirectories(SRC_PLUGIN_DIR, async (dirErr: Error, dirRes: string[]) => {
 getDirectories(SRC_PLUGIN_DIR, async (dirErr: Error, dirRes: string[]) => {
   const cfgs: any[] = [];
   const cfgs: any[] = [];
   if (dirErr) {
   if (dirErr) {
-    console.log("Reading plugin directory Error", dirErr);
+    console.log('Reading plugin directory Error', dirErr);
   } else {
   } else {
     generateCfgs(cfgs, dirRes);
     generateCfgs(cfgs, dirRes);
   }
   }
@@ -81,14 +85,14 @@ getDirectories(SRC_PLUGIN_DIR, async (dirErr: Error, dirRes: string[]) => {
       DEV_PLUGIN_DIR,
       DEV_PLUGIN_DIR,
       (devDirErr: Error, devDirRes: string[]) => {
       (devDirErr: Error, devDirRes: string[]) => {
         if (devDirErr) {
         if (devDirErr) {
-          console.log("Reading dev plugin directory Error", devDirErr);
+          console.log('Reading dev plugin directory Error', devDirErr);
         } else {
         } else {
           generateCfgs(cfgs, devDirRes, false);
           generateCfgs(cfgs, devDirRes, false);
         }
         }
       }
       }
     );
     );
   }
   }
-  console.log("======/api/plugins configs======", cfgs);
+  console.log('======/api/plugins configs======', cfgs);
   cfgs.forEach(async (cfg: any) => {
   cfgs.forEach(async (cfg: any) => {
     const { api: pluginPath, componentPath } = cfg;
     const { api: pluginPath, componentPath } = cfg;
     if (!pluginPath) return;
     if (!pluginPath) return;
@@ -98,30 +102,30 @@ getDirectories(SRC_PLUGIN_DIR, async (dirErr: Error, dirRes: string[]) => {
     pluginsRouter.use(`/${pluginPath}`, pluginRouter);
     pluginsRouter.use(`/${pluginPath}`, pluginRouter);
   });
   });
 
 
-  router.use("/milvus", connectRouter);
-  router.use("/collections", collectionsRouter);
-  router.use("/partitions", partitionsRouter);
-  router.use("/schema", schemaRouter);
-  router.use("/crons", cronsRouter);
+  router.use('/milvus', connectRouter);
+  router.use('/collections', collectionsRouter);
+  router.use('/partitions', partitionsRouter);
+  router.use('/schema', schemaRouter);
+  router.use('/crons', cronsRouter);
 
 
-  router.get("/healthy", (req, res, next) => {
+  router.get('/healthy', (req, res, next) => {
     res.json({ status: 200 });
     res.json({ status: 200 });
     next();
     next();
   });
   });
 
 
-  app.use("/api/v1", router);
-  app.use("/api/plugins", pluginsRouter);
+  app.use('/api/v1', router);
+  app.use('/api/plugins', pluginsRouter);
 
 
   // Return client build files
   // Return client build files
-  app.use(express.static("build"));
+  app.use(express.static('build'));
 
 
   const data = surveSwaggerSpecification();
   const data = surveSwaggerSpecification();
-  app.use("/api/v1/swagger", swaggerUi.serve, swaggerUi.setup(data));
+  app.use('/api/v1/swagger', swaggerUi.serve, swaggerUi.setup(data));
 
 
   // handle every other route with index.html, which will contain
   // handle every other route with index.html, which will contain
   // a script tag to your application's JavaScript file(s).
   // a script tag to your application's JavaScript file(s).
-  app.get("*", (request, response) => {
-    response.sendFile(path.join(__dirname, "../build/index.html"));
+  app.get('*', (request, response) => {
+    response.sendFile(path.join(__dirname, '../build/index.html'));
   });
   });
 
 
   // ErrorInterceptor
   // ErrorInterceptor

+ 27 - 12
express/src/middlewares/index.ts

@@ -1,6 +1,21 @@
-import { Request, Response, NextFunction, Errback } from "express";
-import morgan from "morgan";
-import chalk from "chalk";
+import { Request, Response, NextFunction, Errback } from 'express';
+import morgan from 'morgan';
+import chalk from 'chalk';
+import { MilvusService } from '../milvus/milvus.service';
+
+const MILVUS_ADDRESS = 'milvus_address';
+
+export const ReqHeaderMiddleware = (
+  req: Request,
+  res: Response,
+  next: NextFunction
+) => {
+  // all request need set milvus address in header.
+  // server will set activeaddress in milvus service.
+  const milvusAddress = req.headers[MILVUS_ADDRESS] || '';
+  MilvusService.activeAddress = milvusAddress as string;
+  next();
+};
 
 
 export const TransformResMiddlerware = (
 export const TransformResMiddlerware = (
   req: Request,
   req: Request,
@@ -41,22 +56,22 @@ export const ErrorMiddleware = (
   if (err) {
   if (err) {
     res
     res
       .status(500)
       .status(500)
-      .json({ message: `${err}`, error: "Bad Request", statusCode: 500 });
+      .json({ message: `${err}`, error: 'Bad Request', statusCode: 500 });
   }
   }
   next();
   next();
 };
 };
 
 
 export const LoggingMiddleware = morgan((tokens, req, res) => {
 export const LoggingMiddleware = morgan((tokens, req, res) => {
   return [
   return [
-    "\n",
+    '\n',
     chalk.blue.bold(tokens.method(req, res)),
     chalk.blue.bold(tokens.method(req, res)),
     chalk.magenta.bold(tokens.status(req, res)),
     chalk.magenta.bold(tokens.status(req, res)),
     chalk.green.bold(tokens.url(req, res)),
     chalk.green.bold(tokens.url(req, res)),
-    chalk.green.bold(tokens["response-time"](req, res) + " ms"),
-    chalk.green.bold("@ " + tokens.date(req, res)),
-    chalk.yellow(tokens["remote-addr"](req, res)),
-    chalk.hex("#fffa65").bold("from " + tokens.referrer(req, res)),
-    chalk.hex("#1e90ff")(tokens["user-agent"](req, res)),
-    "\n",
-  ].join(" ");
+    chalk.green.bold(tokens['response-time'](req, res) + ' ms'),
+    chalk.green.bold('@ ' + tokens.date(req, res)),
+    chalk.yellow(tokens['remote-addr'](req, res)),
+    chalk.hex('#fffa65').bold('from ' + tokens.referrer(req, res)),
+    chalk.hex('#1e90ff')(tokens['user-agent'](req, res)),
+    '\n',
+  ].join(' ');
 });
 });

+ 26 - 36
express/src/milvus/milvus.service.ts

@@ -5,80 +5,70 @@ import {
 } from '@zilliz/milvus2-sdk-node/dist/milvus/types';
 } from '@zilliz/milvus2-sdk-node/dist/milvus/types';
 
 
 export class MilvusService {
 export class MilvusService {
-  private milvusAddress: string;
-  private milvusClient: MilvusClient;
+  // Share with all instances, so activeAddress is static
+  static activeAddress: string;
   private milvusClients: { [x: string]: MilvusClient };
   private milvusClients: { [x: string]: MilvusClient };
 
 
   constructor() {
   constructor() {
-    this.milvusAddress = '';
     this.milvusClients = {};
     this.milvusClients = {};
   }
   }
 
 
-  get milvusAddressGetter() {
-    return this.milvusAddress;
-  }
-
-  get milvusClientGetter() {
-    return this.milvusClient;
+  get activeMilvusClient() {
+    // undefined means not connect yet, will throw error to client.
+    return this.milvusClients[MilvusService.activeAddress];
   }
   }
 
 
   get collectionManager() {
   get collectionManager() {
     this.checkMilvus();
     this.checkMilvus();
-    return this.milvusClient.collectionManager;
+    return this.activeMilvusClient.collectionManager;
   }
   }
 
 
   get partitionManager() {
   get partitionManager() {
     this.checkMilvus();
     this.checkMilvus();
-    return this.milvusClient.partitionManager;
+    return this.activeMilvusClient.partitionManager;
   }
   }
 
 
   get indexManager() {
   get indexManager() {
     this.checkMilvus();
     this.checkMilvus();
-    return this.milvusClient.indexManager;
+    return this.activeMilvusClient.indexManager;
   }
   }
 
 
   get dataManager() {
   get dataManager() {
     this.checkMilvus();
     this.checkMilvus();
-    return this.milvusClient.dataManager;
+    return this.activeMilvusClient.dataManager;
   }
   }
 
 
   private checkMilvus() {
   private checkMilvus() {
-    if (!this.milvusClient) {
+    if (!this.activeMilvusClient) {
       throw new Error('Please connect milvus first');
       throw new Error('Please connect milvus first');
     }
     }
   }
   }
 
 
+  formatAddress(address: string) {
+    return address.replace(/(http|https):\/\//, '');
+  }
+
   async connectMilvus(address: string) {
   async connectMilvus(address: string) {
     // grpc only need address without http
     // grpc only need address without http
-    const milvusAddress = address.replace(/(http|https):\/\//, '');
+    const milvusAddress = this.formatAddress(address);
     try {
     try {
-      this.milvusClient = new MilvusClient(milvusAddress);
-      await this.milvusClient.collectionManager.hasCollection({
+      const milvusClient = new MilvusClient(milvusAddress);
+      await milvusClient.collectionManager.hasCollection({
         collection_name: 'not_exist',
         collection_name: 'not_exist',
       });
       });
-      this.milvusAddress = address;
-      this.milvusClients = {
-        address: this.milvusClient,
-      };
-      return { address: this.milvusAddress };
+      MilvusService.activeAddress = address;
+      this.milvusClients[milvusAddress] = milvusClient;
+      return { address };
     } catch (error) {
     } catch (error) {
+      // if milvus is not working, delete connection.
+      delete this.milvusClients[milvusAddress];
       throw new Error('Connect milvus failed, check your milvus address.');
       throw new Error('Connect milvus failed, check your milvus address.');
     }
     }
   }
   }
 
 
-  async logout(address: string) {
-    if (this.milvusClients[address]) {
-      delete this.milvusClients[address];
-    }
-    if (this.milvusAddress === address) {
-      this.milvusAddress = '';
-    }
-
-    return { logout: true };
-  }
-
   async checkConnect(address: string) {
   async checkConnect(address: string) {
-    if (address !== this.milvusAddress) {
+    const milvusAddress = this.formatAddress(address);
+    if (!Object.keys(this.milvusClients).includes(milvusAddress)) {
       return { connected: false };
       return { connected: false };
     }
     }
     const res = await this.connectMilvus(address);
     const res = await this.connectMilvus(address);
@@ -88,12 +78,12 @@ export class MilvusService {
   }
   }
 
 
   async flush(data: FlushReq) {
   async flush(data: FlushReq) {
-    const res = await this.milvusClient.dataManager.flush(data);
+    const res = await this.activeMilvusClient.dataManager.flush(data);
     return res;
     return res;
   }
   }
 
 
   async getMetrics(): Promise<GetMetricsResponse> {
   async getMetrics(): Promise<GetMetricsResponse> {
-    const res = await this.milvusClient.dataManager.getMetric({
+    const res = await this.activeMilvusClient.dataManager.getMetric({
       request: { metric_type: 'system_info' },
       request: { metric_type: 'system_info' },
     });
     });
     return res;
     return res;