Browse Source

Merge pull request #267 from nameczz/refine-milvus-client

Support multiple  milvus connections in different browsers
nameczz 3 years ago
parent
commit
44de6fe87a

+ 11 - 1
client/src/components/layout/GlobalEffect.tsx

@@ -2,12 +2,15 @@ import React, { useContext } from 'react';
 import axiosInstance from '../../http/Axios';
 import { rootContext } from '../../context/Root';
 import { CODE_STATUS } from '../../consts/Http';
+import { MILVUS_ADDRESS } from '../../consts/Localstorage';
+import { authContext } from '../../context/Auth';
 
 let axiosResInterceptor: number | null = null;
 // let timer: Record<string, ReturnType<typeof setTimeout> | number>[] = [];
 // we only take side effect here, nothing else
 const GlobalEffect = (props: { children: React.ReactNode }) => {
   const { openSnackBar } = useContext(rootContext);
+  const { setAddress } = useContext(authContext);
 
   // catch axios error here
   if (axiosResInterceptor === null) {
@@ -22,7 +25,14 @@ const GlobalEffect = (props: { children: React.ReactNode }) => {
       },
       function (error: any) {
         const { response = {} } = error;
-
+        switch (response.status) {
+          case CODE_STATUS.UNAUTHORIZED:
+            setAddress('');
+            window.localStorage.removeItem(MILVUS_ADDRESS);
+            break;
+          default:
+            break;
+        }
         if (response.data) {
           const { message: errMsg } = response.data;
 

+ 3 - 2
client/src/http/Axios.ts

@@ -1,4 +1,5 @@
 import axios from 'axios';
+import { MILVUS_ADDRESS } from '../consts/Localstorage';
 // import { SESSION } from '../consts/Localstorage';
 
 // console.log(process.env.NODE_ENV, 'api:', process.env.REACT_APP_BASE_URL);
@@ -16,9 +17,9 @@ const axiosInstance = axios.create({
 axiosInstance.interceptors.request.use(
   function (config) {
     // Do something before request is sent
-    // const session = window.localStorage.getItem(SESSION);
+    const address = window.localStorage.getItem(MILVUS_ADDRESS);
 
-    // session && (config.headers[SESSION] = session);
+    address && (config.headers[MILVUS_ADDRESS] = address);
 
     return config;
   },

+ 1 - 0
client/src/http/Milvus.ts

@@ -3,6 +3,7 @@ import BaseModel from './BaseModel';
 
 export class MilvusHttp extends BaseModel {
   static CONNECT_URL = '/milvus/connect';
+
   static CHECK_URL = '/milvus/check';
   static FLUSH_URL = '/milvus/flush';
   static METRICS_URL = '/milvus/metrics';

+ 5 - 2
express/.prettierrc

@@ -1,7 +1,10 @@
 {
+  "printWidth": 80,
   "tabWidth": 2,
+  "useTabs": false,
   "semi": true,
   "singleQuote": true,
   "trailingComma": "es5",
-  "bracketSpacing": true
-}
+  "bracketSpacing": true,
+  "arrowParens": "avoid"
+}

+ 4 - 0
express/package.json

@@ -14,6 +14,8 @@
     "express": "^4.17.1",
     "glob": "^7.2.0",
     "helmet": "^4.6.0",
+    "http-errors": "^1.8.1",
+    "lru-cache": "^6.0.0",
     "morgan": "^1.10.0",
     "node-cron": "^3.0.0",
     "rimraf": "^3.0.2",
@@ -42,7 +44,9 @@
     "@types/cors": "^2.8.12",
     "@types/express": "^4.17.13",
     "@types/glob": "^7.2.0",
+    "@types/http-errors": "^1.8.1",
     "@types/jest": "^27.0.2",
+    "@types/lru-cache": "^5.1.1",
     "@types/morgan": "^1.9.3",
     "@types/node": "^16.11.6",
     "@types/node-cron": "^3.0.0",

+ 8 - 0
express/src/__tests__/__mocks__/consts.ts

@@ -1,3 +1,11 @@
+import LRUCache from 'lru-cache';
+import { EXPIRED_TIME } from '../../utils/Const';
+
+export const insightCacheForTest = new LRUCache({
+  maxAge: EXPIRED_TIME,
+  updateAgeOnGet: true,
+});
+
 // mock data
 export const mockAddress = '127.0.0.1';
 export const mockCollectionNames = [{ name: 'c1' }, { name: 'c2' }];

+ 6 - 1
express/src/__tests__/collections/collections.service.test.ts

@@ -7,6 +7,7 @@ import {
   ERR_NO_PARAM,
 } from '../utils/constants';
 import {
+  insightCacheForTest,
   mockAddress,
   mockCollectionNames,
   mockCollections,
@@ -14,6 +15,7 @@ import {
   mockLoadedCollections,
   mockLoadedCollectionsData,
 } from '../__mocks__/consts';
+import { MilvusClient } from '@zilliz/milvus2-sdk-node/dist/milvus';
 
 // mock Milvus client
 jest.mock('@zilliz/milvus2-sdk-node', () => {
@@ -29,7 +31,10 @@ describe('Test collections service', () => {
   beforeAll(async () => {
     // setup Milvus service and connect to mock Milvus client
     milvusService = new MilvusService();
-    await milvusService.connectMilvus(mockAddress);
+    MilvusService.activeAddress = mockAddress;
+    MilvusService.activeMilvusClient = new MilvusClient(mockAddress);
+
+    await milvusService.connectMilvus(mockAddress, insightCacheForTest);
     service = new CollectionsService(milvusService);
   });
 

+ 7 - 2
express/src/__tests__/crons/crons.service.test.ts

@@ -4,7 +4,8 @@ import { CollectionsService } from '../../collections/collections.service';
 import { CronsService, SchedulerRegistry } from '../../crons/crons.service';
 import { MilvusService } from '../../milvus/milvus.service';
 import { WS_EVENTS, WS_EVENTS_TYPE } from '../../utils/Const';
-import { mockAddress } from '../__mocks__/consts';
+import { insightCacheForTest, mockAddress } from '../__mocks__/consts';
+import { MilvusClient } from '@zilliz/milvus2-sdk-node/dist/milvus';
 
 // mock Milvus client
 jest.mock('@zilliz/milvus2-sdk-node', () => {
@@ -41,7 +42,11 @@ describe('test crons service', () => {
 
   const setup = async () => {
     milvusService = new MilvusService();
-    await milvusService.connectMilvus(mockAddress);
+    MilvusService.activeAddress = mockAddress;
+    MilvusService.activeMilvusClient = new MilvusClient(mockAddress);
+
+    await milvusService.connectMilvus(mockAddress, insightCacheForTest);
+
     collectionService = new CollectionsService(milvusService);
 
     schedulerRegistry = new SchedulerRegistry([]);

+ 12 - 14
express/src/__tests__/milvus/milvus.service.test.ts

@@ -1,6 +1,7 @@
 import mockMilvusClient from '../__mocks__/milvus/milvusClient';
 import { MilvusService } from '../../milvus/milvus.service';
-import { mockAddress } from '../__mocks__/consts';
+import { insightCacheForTest, mockAddress } from '../__mocks__/consts';
+import { MilvusClient } from '@zilliz/milvus2-sdk-node/dist/milvus';
 
 // mock Milvus client
 jest.mock('@zilliz/milvus2-sdk-node', () => {
@@ -15,6 +16,8 @@ describe('Test Milvus service', () => {
   // init service
   beforeEach(() => {
     service = new MilvusService();
+    MilvusService.activeAddress = mockAddress;
+    MilvusService.activeMilvusClient = new MilvusClient(mockAddress);
   });
 
   afterEach(() => {
@@ -22,18 +25,13 @@ describe('Test Milvus service', () => {
   });
 
   test('test connectMilvus method', async () => {
-    expect(service.milvusClientGetter).toBeUndefined();
-    expect(service.milvusAddressGetter).toBe('');
-
-    const res = await service.connectMilvus(mockAddress);
+    const res = await service.connectMilvus(mockAddress, insightCacheForTest);
     expect(res.address).toBe(mockAddress);
-    expect(service.milvusAddressGetter).toBe(mockAddress);
-    expect(service.milvusClientGetter).toBeDefined();
   });
 
   test('test connectMilvus method error', async () => {
     try {
-      await service.connectMilvus('');
+      await service.connectMilvus('', insightCacheForTest);
     } catch (err) {
       expect(err.message).toBe(
         'Connect milvus failed, check your milvus address.'
@@ -51,16 +49,16 @@ describe('Test Milvus service', () => {
 
   test('test checkConnect method', async () => {
     // mock connect first
-    await service.connectMilvus(mockAddress);
+    await service.connectMilvus(mockAddress, insightCacheForTest);
     // different address
-    const errorRes = await service.checkConnect('123');
+    const errorRes = await service.checkConnect('123', insightCacheForTest);
     expect(errorRes.connected).toBeFalsy();
-    const res = await service.checkConnect(mockAddress);
+    const res = await service.checkConnect(mockAddress, insightCacheForTest);
     expect(res.connected).toBeTruthy();
   });
 
   test('test managers after connected', async () => {
-    await service.connectMilvus(mockAddress);
+    await service.connectMilvus(mockAddress, insightCacheForTest);
     expect(service.collectionManager).toBeDefined();
     expect(service.partitionManager).toBeDefined();
     expect(service.indexManager).toBeDefined();
@@ -68,14 +66,14 @@ describe('Test Milvus service', () => {
   });
 
   test('test flush method', async () => {
-    await service.connectMilvus(mockAddress);
+    await service.connectMilvus(mockAddress, insightCacheForTest);
     const res = await service.flush({ collection_names: ['c1', 'c2'] });
     const data = res.data.collection_names;
     expect(data.length).toBe(2);
   });
 
   test('test getMetrics method', async () => {
-    await service.connectMilvus(mockAddress);
+    await service.connectMilvus(mockAddress, insightCacheForTest);
     const res = await service.getMetrics();
     expect(res.type).toBe('system_info');
   });

+ 6 - 1
express/src/__tests__/partitions/partitions.service.test.ts

@@ -3,10 +3,12 @@ import { MilvusService } from '../../milvus/milvus.service';
 import { ERR_NO_COLLECTION } from '../utils/constants';
 import { PartitionsService } from '../../partitions/partitions.service';
 import {
+  insightCacheForTest,
   mockAddress,
   mockGetPartitionsInfoData,
   mockPartition,
 } from '../__mocks__/consts';
+import { MilvusClient } from '@zilliz/milvus2-sdk-node/dist/milvus';
 
 // mock Milvus client
 jest.mock('@zilliz/milvus2-sdk-node', () => {
@@ -22,7 +24,10 @@ describe('Test partitions service', () => {
   beforeAll(async () => {
     // setup Milvus service and connect to mock Milvus client
     milvusService = new MilvusService();
-    await milvusService.connectMilvus(mockAddress);
+    MilvusService.activeAddress = mockAddress;
+    MilvusService.activeMilvusClient = new MilvusClient(mockAddress);
+
+    await milvusService.connectMilvus(mockAddress, insightCacheForTest);
     service = new PartitionsService(milvusService);
   });
 

+ 6 - 2
express/src/__tests__/schema/schema.service.test.ts

@@ -2,7 +2,8 @@ import mockMilvusClient from '../__mocks__/milvus/milvusClient';
 import { MilvusService } from '../../milvus/milvus.service';
 import { CodeEnum, ERR_NO_COLLECTION } from '../utils/constants';
 import { SchemaService } from '../../schema/schema.service';
-import { mockAddress } from '../__mocks__/consts';
+import { insightCacheForTest, mockAddress } from '../__mocks__/consts';
+import { MilvusClient } from '@zilliz/milvus2-sdk-node/dist/milvus';
 
 // mock Milvus client
 jest.mock('@zilliz/milvus2-sdk-node', () => {
@@ -18,7 +19,10 @@ describe('Test schema service', () => {
   beforeAll(async () => {
     // setup Milvus service and connect to mock Milvus client
     milvusService = new MilvusService();
-    await milvusService.connectMilvus(mockAddress);
+    MilvusService.activeAddress = mockAddress;
+    MilvusService.activeMilvusClient = new MilvusClient(mockAddress);
+
+    await milvusService.connectMilvus(mockAddress, insightCacheForTest);
     service = new SchemaService(milvusService);
   });
 

+ 54 - 42
express/src/app.ts

@@ -1,29 +1,37 @@
-import express from "express";
-import cors from "cors";
-import helmet from "helmet";
-import * as http from "http";
-import { Server, Socket } from "socket.io";
-import { router as connectRouter } from "./milvus";
-import { router as collectionsRouter } from "./collections";
-import { router as partitionsRouter } from "./partitions";
-import { router as schemaRouter } from "./schema";
-import { router as cronsRouter } from "./crons";
-import { pubSub } from "./events";
+import express from 'express';
+import cors from 'cors';
+import helmet from 'helmet';
+import * as http from 'http';
+import { Server, Socket } from 'socket.io';
+import { router as connectRouter } from './milvus';
+import { router as collectionsRouter } from './collections';
+import { router as partitionsRouter } from './partitions';
+import { router as schemaRouter } from './schema';
+import { router as cronsRouter } from './crons';
+import { pubSub } from './events';
 import {
   TransformResMiddlerware,
   LoggingMiddleware,
   ErrorMiddleware,
-} from "./middlewares";
+  ReqHeaderMiddleware,
+} from './middlewares';
 
-import { getDirectories, getDirectoriesSync, generateCfgs } from "./utils";
-import * as path from "path";
-import chalk from "chalk";
-import { surveSwaggerSpecification } from "./swagger";
-import swaggerUi from "swagger-ui-express";
+import { getDirectories, getDirectoriesSync, generateCfgs } from './utils';
+import * as path from 'path';
+import chalk from 'chalk';
+import { surveSwaggerSpecification } from './swagger';
+import swaggerUi from 'swagger-ui-express';
+import LruCache from 'lru-cache';
+import { EXPIRED_TIME, INSIGHT_CACHE } from './utils/Const';
 
 const PLUGIN_DEV = process.env?.PLUGIN_DEV;
-const SRC_PLUGIN_DIR = "src/plugins";
-const DEV_PLUGIN_DIR = "../../src/*/server";
+const SRC_PLUGIN_DIR = 'src/plugins';
+const DEV_PLUGIN_DIR = '../../src/*/server';
+
+const insightCache = new LruCache({
+  maxAge: EXPIRED_TIME,
+  updateAgeOnGet: true,
+});
 
 export const app = express();
 const PORT = 3000;
@@ -32,11 +40,12 @@ const server = http.createServer(app);
 // initialize the WebSocket server instance
 const io = new Server(server, {
   cors: {
-    origin: "*",
-    methods: ["GET", "POST"],
+    origin: '*',
+    methods: ['GET', 'POST'],
   },
 });
 
+app.set(INSIGHT_CACHE, insightCache);
 // https://expressjs.com/en/resources/middleware/cors.html
 app.use(cors());
 // https://github.com/helmetjs/helmet
@@ -45,22 +54,25 @@ app.use(
     contentSecurityPolicy: false,
   })
 );
-app.use(express.json({ limit: "150MB" }));
+app.use(express.json({ limit: '150MB' }));
 // TransformResInterceptor
 app.use(TransformResMiddlerware);
 // LoggingInterceptor
 app.use(LoggingMiddleware);
 
+// All headers operations
+app.use(ReqHeaderMiddleware);
+
 const router = express.Router();
 const pluginsRouter = express.Router();
 
 // Init WebSocket server event listener
-io.on("connection", (socket: Socket) => {
-  console.log("socket.io connected");
-  socket.on("COLLECTION", (message: any) => {
-    socket.emit("COLLECTION", { data: message });
+io.on('connection', (socket: Socket) => {
+  console.log('socket.io connected');
+  socket.on('COLLECTION', (message: any) => {
+    socket.emit('COLLECTION', { data: message });
   });
-  pubSub.on("ws_pubsub", (msg) => {
+  pubSub.on('ws_pubsub', msg => {
     const { event, data } = msg;
     socket.emit(event, data);
   });
@@ -71,7 +83,7 @@ io.on("connection", (socket: Socket) => {
 getDirectories(SRC_PLUGIN_DIR, async (dirErr: Error, dirRes: string[]) => {
   const cfgs: any[] = [];
   if (dirErr) {
-    console.log("Reading plugin directory Error", dirErr);
+    console.log('Reading plugin directory Error', dirErr);
   } else {
     generateCfgs(cfgs, dirRes);
   }
@@ -81,14 +93,14 @@ getDirectories(SRC_PLUGIN_DIR, async (dirErr: Error, dirRes: string[]) => {
       DEV_PLUGIN_DIR,
       (devDirErr: Error, devDirRes: string[]) => {
         if (devDirErr) {
-          console.log("Reading dev plugin directory Error", devDirErr);
+          console.log('Reading dev plugin directory Error', devDirErr);
         } else {
           generateCfgs(cfgs, devDirRes, false);
         }
       }
     );
   }
-  console.log("======/api/plugins configs======", cfgs);
+  console.log('======/api/plugins configs======', cfgs);
   cfgs.forEach(async (cfg: any) => {
     const { api: pluginPath, componentPath } = cfg;
     if (!pluginPath) return;
@@ -98,30 +110,30 @@ getDirectories(SRC_PLUGIN_DIR, async (dirErr: Error, dirRes: string[]) => {
     pluginsRouter.use(`/${pluginPath}`, pluginRouter);
   });
 
-  router.use("/milvus", connectRouter);
-  router.use("/collections", collectionsRouter);
-  router.use("/partitions", partitionsRouter);
-  router.use("/schema", schemaRouter);
-  router.use("/crons", cronsRouter);
+  router.use('/milvus', connectRouter);
+  router.use('/collections', collectionsRouter);
+  router.use('/partitions', partitionsRouter);
+  router.use('/schema', schemaRouter);
+  router.use('/crons', cronsRouter);
 
-  router.get("/healthy", (req, res, next) => {
+  router.get('/healthy', (req, res, next) => {
     res.json({ status: 200 });
     next();
   });
 
-  app.use("/api/v1", router);
-  app.use("/api/plugins", pluginsRouter);
+  app.use('/api/v1', router);
+  app.use('/api/plugins', pluginsRouter);
 
   // Return client build files
-  app.use(express.static("build"));
+  app.use(express.static('build'));
 
   const data = surveSwaggerSpecification();
-  app.use("/api/v1/swagger", swaggerUi.serve, swaggerUi.setup(data));
+  app.use('/api/v1/swagger', swaggerUi.serve, swaggerUi.setup(data));
 
   // handle every other route with index.html, which will contain
   // a script tag to your application's JavaScript file(s).
-  app.get("*", (request, response) => {
-    response.sendFile(path.join(__dirname, "../build/index.html"));
+  app.get('*', (request, response) => {
+    response.sendFile(path.join(__dirname, '../build/index.html'));
   });
 
   // ErrorInterceptor

+ 41 - 16
express/src/middlewares/index.ts

@@ -1,6 +1,26 @@
-import { Request, Response, NextFunction, Errback } from "express";
-import morgan from "morgan";
-import chalk from "chalk";
+import { Request, Response, NextFunction, Errback } from 'express';
+import morgan from 'morgan';
+import chalk from 'chalk';
+import { MilvusService } from '../milvus/milvus.service';
+import { INSIGHT_CACHE, MILVUS_ADDRESS } from '../utils/Const';
+import { HttpError } from 'http-errors';
+
+export const ReqHeaderMiddleware = (
+  req: Request,
+  res: Response,
+  next: NextFunction
+) => {
+  const insightCache = req.app.get(INSIGHT_CACHE);
+  // all request need set milvus address in header.
+  // server will set activeaddress in milvus service.
+  const milvusAddress = (req.headers[MILVUS_ADDRESS] as string) || '';
+  MilvusService.activeAddress = milvusAddress;
+  // insight cache will update expire time when use insightCache.get
+  MilvusService.activeMilvusClient = insightCache.get(
+    MilvusService.formatAddress(milvusAddress)
+  );
+  next();
+};
 
 export const TransformResMiddlerware = (
   req: Request,
@@ -8,7 +28,7 @@ export const TransformResMiddlerware = (
   next: NextFunction
 ) => {
   const oldSend = res.json;
-  res.json = (data) => {
+  res.json = data => {
     // console.log(data); // do something with the data
     const statusCode = data?.statusCode;
     const message = data?.message;
@@ -27,12 +47,17 @@ export const TransformResMiddlerware = (
  * Normally depend on status which from milvus service.
  */
 export const ErrorMiddleware = (
-  err: Error,
+  err: HttpError,
   req: Request,
   res: Response,
   next: NextFunction
 ) => {
-  console.log(chalk.blue.bold(req.method, req.url), chalk.red.bold(err));
+  const statusCode = err.statusCode || 500;
+  console.log(
+    chalk.blue.bold(req.method, req.url),
+    chalk.magenta.bold(statusCode),
+    chalk.red.bold(err)
+  );
   // Boolean property that indicates if the app sent HTTP headers for the response.
   // Here to prevent sending response after header has been sent.
   if (res.headersSent) {
@@ -40,23 +65,23 @@ export const ErrorMiddleware = (
   }
   if (err) {
     res
-      .status(500)
-      .json({ message: `${err}`, error: "Bad Request", statusCode: 500 });
+      .status(statusCode)
+      .json({ message: `${err}`, error: 'Bad Request', statusCode });
   }
   next();
 };
 
 export const LoggingMiddleware = morgan((tokens, req, res) => {
   return [
-    "\n",
+    '\n',
     chalk.blue.bold(tokens.method(req, res)),
     chalk.magenta.bold(tokens.status(req, res)),
     chalk.green.bold(tokens.url(req, res)),
-    chalk.green.bold(tokens["response-time"](req, res) + " ms"),
-    chalk.green.bold("@ " + tokens.date(req, res)),
-    chalk.yellow(tokens["remote-addr"](req, res)),
-    chalk.hex("#fffa65").bold("from " + tokens.referrer(req, res)),
-    chalk.hex("#1e90ff")(tokens["user-agent"](req, res)),
-    "\n",
-  ].join(" ");
+    chalk.green.bold(tokens['response-time'](req, res) + ' ms'),
+    chalk.green.bold('@ ' + tokens.date(req, res)),
+    chalk.yellow(tokens['remote-addr'](req, res)),
+    chalk.hex('#fffa65').bold('from ' + tokens.referrer(req, res)),
+    chalk.hex('#1e90ff')(tokens['user-agent'](req, res)),
+    '\n',
+  ].join(' ');
 });

+ 21 - 11
express/src/milvus/milvus.controller.ts

@@ -1,7 +1,8 @@
-import { NextFunction, Request, Response, Router } from "express";
-import { dtoValidationMiddleware } from "../middlewares/validation";
-import { MilvusService } from "./milvus.service";
-import { ConnectMilvusDto, FlushDto } from "./dto";
+import { NextFunction, Request, Response, Router } from 'express';
+import { dtoValidationMiddleware } from '../middlewares/validation';
+import { MilvusService } from './milvus.service';
+import { ConnectMilvusDto, FlushDto } from './dto';
+import { INSIGHT_CACHE } from '../utils/Const';
 
 export class MilvusController {
   private router: Router;
@@ -18,28 +19,32 @@ export class MilvusController {
 
   generateRoutes() {
     this.router.post(
-      "/connect",
+      '/connect',
       dtoValidationMiddleware(ConnectMilvusDto),
       this.connectMilvus.bind(this)
     );
 
-    this.router.get("/check", this.checkConnect.bind(this));
+    this.router.get('/check', this.checkConnect.bind(this));
 
     this.router.put(
-      "/flush",
+      '/flush',
       dtoValidationMiddleware(FlushDto),
       this.flush.bind(this)
     );
 
-    this.router.get("/metrics", this.getMetrics.bind(this));
+    this.router.get('/metrics', this.getMetrics.bind(this));
 
     return this.router;
   }
 
   async connectMilvus(req: Request, res: Response, next: NextFunction) {
     const address = req.body?.address;
+    const insightCache = req.app.get(INSIGHT_CACHE);
     try {
-      const result = await this.milvusService.connectMilvus(address);
+      const result = await this.milvusService.connectMilvus(
+        address,
+        insightCache
+      );
 
       res.send(result);
     } catch (error) {
@@ -48,9 +53,14 @@ export class MilvusController {
   }
 
   async checkConnect(req: Request, res: Response, next: NextFunction) {
-    const address = "" + req.query?.address;
+    const address = '' + req.query?.address;
+    const insightCache = req.app.get(INSIGHT_CACHE);
+
     try {
-      const result = await this.milvusService.checkConnect(address);
+      const result = await this.milvusService.checkConnect(
+        address,
+        insightCache
+      );
       res.send(result);
     } catch (error) {
       next(error);

+ 40 - 34
express/src/milvus/milvus.service.ts

@@ -3,81 +3,87 @@ import {
   FlushReq,
   GetMetricsResponse,
 } from '@zilliz/milvus2-sdk-node/dist/milvus/types';
-
+import HttpErrors from 'http-errors';
+import LruCache from 'lru-cache';
+import { HTTP_STATUS_CODE } from '../utils/Error';
 export class MilvusService {
-  private milvusAddress: string;
-  private milvusClient: MilvusClient;
-
-  constructor() {
-    this.milvusAddress = '';
-  }
-
-  get milvusAddressGetter() {
-    return this.milvusAddress;
-  }
-
-  get milvusClientGetter() {
-    return this.milvusClient;
-  }
+  // Share with all instances, so activeAddress is static
+  static activeAddress: string;
+  static activeMilvusClient: MilvusClient;
 
   get collectionManager() {
     this.checkMilvus();
-    return this.milvusClient.collectionManager;
+    return MilvusService.activeMilvusClient.collectionManager;
   }
 
   get partitionManager() {
     this.checkMilvus();
-    return this.milvusClient.partitionManager;
+    return MilvusService.activeMilvusClient.partitionManager;
   }
 
   get indexManager() {
     this.checkMilvus();
-    return this.milvusClient.indexManager;
+    return MilvusService.activeMilvusClient.indexManager;
   }
 
   get dataManager() {
     this.checkMilvus();
-    return this.milvusClient.dataManager;
+    return MilvusService.activeMilvusClient.dataManager;
+  }
+
+  static formatAddress(address: string) {
+    return address.replace(/(http|https):\/\//, '');
   }
 
-  private checkMilvus() {
-    if (!this.milvusClient) {
-      throw new Error('Please connect milvus first');
+  checkMilvus() {
+    if (!MilvusService.activeMilvusClient) {
+      throw HttpErrors(
+        HTTP_STATUS_CODE.UNAUTHORIZED,
+        'Please connect milvus first'
+      );
+      // throw new Error('Please connect milvus first');
     }
   }
 
-  async connectMilvus(address: string) {
+  async connectMilvus(address: string, cache: LruCache<any, any>) {
     // grpc only need address without http
-    const milvusAddress = address.replace(/(http|https):\/\//, '');
+    const milvusAddress = MilvusService.formatAddress(address);
     try {
-      this.milvusClient = new MilvusClient(milvusAddress);
-      await this.milvusClient.collectionManager.hasCollection({
+      const milvusClient = new MilvusClient(milvusAddress);
+      await milvusClient.collectionManager.hasCollection({
         collection_name: 'not_exist',
       });
-      this.milvusAddress = address;
-      return { address: this.milvusAddress };
+      MilvusService.activeAddress = address;
+      cache.set(milvusAddress, milvusClient);
+      return { address };
     } catch (error) {
-      throw new Error('Connect milvus failed, check your milvus address.');
+      // if milvus is not working, delete connection.
+      cache.del(milvusAddress);
+      throw HttpErrors(
+        HTTP_STATUS_CODE.BAD_REQUEST,
+        'Connect milvus failed, check your milvus address.'
+      );
     }
   }
 
-  async checkConnect(address: string) {
-    if (address !== this.milvusAddress) {
+  async checkConnect(address: string, cache: LruCache<any, any>) {
+    const milvusAddress = MilvusService.formatAddress(address);
+    if (!cache.has(milvusAddress)) {
       return { connected: false };
     }
-    const res = await this.connectMilvus(address);
+    const res = await this.connectMilvus(address, cache);
     return {
       connected: res.address ? true : false,
     };
   }
 
   async flush(data: FlushReq) {
-    const res = await this.milvusClient.dataManager.flush(data);
+    const res = await MilvusService.activeMilvusClient.dataManager.flush(data);
     return res;
   }
 
   async getMetrics(): Promise<GetMetricsResponse> {
-    const res = await this.milvusClient.dataManager.getMetric({
+    const res = await MilvusService.activeMilvusClient.dataManager.getMetric({
       request: { metric_type: 'system_info' },
     });
     return res;

+ 7 - 0
express/src/utils/Const.ts

@@ -1,5 +1,12 @@
 export const ROW_COUNT = 'row_count';
 
+// use in req header
+export const MILVUS_ADDRESS = 'milvus_address';
+
+// for lru cache
+export const INSIGHT_CACHE = 'insight_cache';
+export const EXPIRED_TIME = 1000 * 60 * 60 * 24;
+
 export enum LOADING_STATE {
   LOADED,
   LOADING,

+ 50 - 0
express/src/utils/Error.ts

@@ -8,3 +8,53 @@ export const throwErrorFromSDK = (res: ResStatus) => {
     throw res.reason;
   }
 };
+
+export enum HTTP_STATUS_CODE {
+  CONTINUE = 100,
+  SWITCHING_PROTOCOLS = 101,
+  PROCESSING = 102,
+  EARLYHINTS = 103,
+  OK = 200,
+  CREATED = 201,
+  ACCEPTED = 202,
+  NON_AUTHORITATIVE_INFORMATION = 203,
+  NO_CONTENT = 204,
+  RESET_CONTENT = 205,
+  PARTIAL_CONTENT = 206,
+  AMBIGUOUS = 300,
+  MOVED_PERMANENTLY = 301,
+  FOUND = 302,
+  SEE_OTHER = 303,
+  NOT_MODIFIED = 304,
+  TEMPORARY_REDIRECT = 307,
+  PERMANENT_REDIRECT = 308,
+  BAD_REQUEST = 400,
+  UNAUTHORIZED = 401,
+  PAYMENT_REQUIRED = 402,
+  FORBIDDEN = 403,
+  NOT_FOUND = 404,
+  METHOD_NOT_ALLOWED = 405,
+  NOT_ACCEPTABLE = 406,
+  PROXY_AUTHENTICATION_REQUIRED = 407,
+  REQUEST_TIMEOUT = 408,
+  CONFLICT = 409,
+  GONE = 410,
+  LENGTH_REQUIRED = 411,
+  PRECONDITION_FAILED = 412,
+  PAYLOAD_TOO_LARGE = 413,
+  URI_TOO_LONG = 414,
+  UNSUPPORTED_MEDIA_TYPE = 415,
+  REQUESTED_RANGE_NOT_SATISFIABLE = 416,
+  EXPECTATION_FAILED = 417,
+  I_AM_A_TEAPOT = 418,
+  MISDIRECTED = 421,
+  UNPROCESSABLE_ENTITY = 422,
+  FAILED_DEPENDENCY = 424,
+  TOO_MANY_REQUESTS = 429,
+  INTERNAL_SERVER_ERROR = 500,
+  NOT_IMPLEMENTED = 501,
+  BAD_GATEWAY = 502,
+  SERVICE_UNAVAILABLE = 503,
+  GATEWAY_TIMEOUT = 504,
+  HTTP_VERSION_NOT_SUPPORTED = 505,
+}

+ 31 - 0
express/yarn.lock

@@ -877,6 +877,11 @@
   dependencies:
     "@types/node" "*"
 
+"@types/http-errors@^1.8.1":
+  version "1.8.1"
+  resolved "https://registry.yarnpkg.com/@types/http-errors/-/http-errors-1.8.1.tgz#e81ad28a60bee0328c6d2384e029aec626f1ae67"
+  integrity sha512-e+2rjEwK6KDaNOm5Aa9wNGgyS9oSZU/4pfSMMPYNOfjvFI0WVXm29+ITRFr6aKDvvKo7uU1jV68MW4ScsfDi7Q==
+
 "@types/istanbul-lib-coverage@*", "@types/istanbul-lib-coverage@^2.0.0", "@types/istanbul-lib-coverage@^2.0.1":
   version "2.0.3"
   resolved "https://registry.yarnpkg.com/@types/istanbul-lib-coverage/-/istanbul-lib-coverage-2.0.3.tgz#4ba8ddb720221f432e443bd5f9117fd22cfd4762"
@@ -914,6 +919,11 @@
   resolved "https://registry.yarnpkg.com/@types/long/-/long-4.0.1.tgz#459c65fa1867dafe6a8f322c4c51695663cc55e9"
   integrity sha512-5tXH6Bx/kNGd3MgffdmP4dy2Z+G4eaXw0SE81Tq3BNadtnMR5/ySMzX4SLEzHJzSmPNn4HIdpQsBvXMUykr58w==
 
+"@types/lru-cache@^5.1.1":
+  version "5.1.1"
+  resolved "https://registry.yarnpkg.com/@types/lru-cache/-/lru-cache-5.1.1.tgz#c48c2e27b65d2a153b19bfc1a317e30872e01eef"
+  integrity sha512-ssE3Vlrys7sdIzs5LOxCzTVMsU7i9oa/IaW92wF32JFb3CVczqOkru2xspuKczHEbG3nvmPY7IFqVmGGHdNbYw==
+
 "@types/mime@^1":
   version "1.3.2"
   resolved "https://registry.yarnpkg.com/@types/mime/-/mime-1.3.2.tgz#93e25bf9ee75fe0fd80b594bc4feb0e862111b5a"
@@ -2238,6 +2248,17 @@ http-errors@1.7.2:
     statuses ">= 1.5.0 < 2"
     toidentifier "1.0.0"
 
+http-errors@^1.8.1:
+  version "1.8.1"
+  resolved "https://registry.yarnpkg.com/http-errors/-/http-errors-1.8.1.tgz#7c3f28577cbc8a207388455dbd62295ed07bd68c"
+  integrity sha512-Kpk9Sm7NmI+RHhnj6OIWDI1d6fIoFAtFt9RLaTMRlg/8w49juAStsrBgp0Dp4OdxdVbRIeKhtCUvoi/RuAhO4g==
+  dependencies:
+    depd "~1.1.2"
+    inherits "2.0.4"
+    setprototypeof "1.2.0"
+    statuses ">= 1.5.0 < 2"
+    toidentifier "1.0.1"
+
 http-errors@~1.7.2:
   version "1.7.3"
   resolved "https://registry.yarnpkg.com/http-errors/-/http-errors-1.7.3.tgz#6c619e4f9c60308c38519498c14fbb10aacebb06"
@@ -3748,6 +3769,11 @@ setprototypeof@1.1.1:
   resolved "https://registry.yarnpkg.com/setprototypeof/-/setprototypeof-1.1.1.tgz#7e95acb24aa92f5885e0abef5ba131330d4ae683"
   integrity sha512-JvdAWfbXeIGaZ9cILp38HntZSFSo3mWg6xGcJJsd+d4aRMOqauag1C63dJfDw7OaMYwEbHMOxEZ1lqVRYP2OAw==
 
+setprototypeof@1.2.0:
+  version "1.2.0"
+  resolved "https://registry.yarnpkg.com/setprototypeof/-/setprototypeof-1.2.0.tgz#66c9a24a73f9fc28cbe66b09fed3d33dcaf1b424"
+  integrity sha512-E5LDX7Wrp85Kil5bhZv46j8jOeboKq5JMmYM3gVGdGH8xFpPWXUMsNrlODCrkoxMEeNi/XZIwuRvY4XNwYMJpw==
+
 shebang-command@^2.0.0:
   version "2.0.0"
   resolved "https://registry.yarnpkg.com/shebang-command/-/shebang-command-2.0.0.tgz#ccd0af4f8835fbdc265b82461aaf0c36663f34ea"
@@ -4050,6 +4076,11 @@ toidentifier@1.0.0:
   resolved "https://registry.yarnpkg.com/toidentifier/-/toidentifier-1.0.0.tgz#7e1be3470f1e77948bc43d94a3c8f4d7752ba553"
   integrity sha512-yaOH/Pk/VEhBWWTlhI+qXxDFXlejDGcQipMlyxda9nthulaxLZUNcUqFxokp0vcYnvteJln5FNQDRrxj3YcbVw==
 
+toidentifier@1.0.1:
+  version "1.0.1"
+  resolved "https://registry.yarnpkg.com/toidentifier/-/toidentifier-1.0.1.tgz#3be34321a88a820ed1bd80dfaa33e479fbb8dd35"
+  integrity sha512-o5sSPKEkg/DIQNmH43V0/uerLrpzVedkUh8tGNvaeXpfpuwjKenlSox/2O/BTlZUtEe+JG7s5YhEz608PlAHRA==
+
 touch@^3.1.0:
   version "3.1.0"
   resolved "https://registry.yarnpkg.com/touch/-/touch-3.1.0.tgz#fe365f5f75ec9ed4e56825e0bb76d24ab74af83b"