Browse Source

refine code

czhen 3 years ago
parent
commit
7a05d54661

+ 49 - 50
express/src/app.ts

@@ -1,7 +1,13 @@
 import fs from "fs";
 import path from "path";
 import glob from "glob";
-import express, { application } from "express";
+import express, {
+  application,
+  Request,
+  Response,
+  NextFunction,
+  Errback,
+} from "express";
 import cors from "cors";
 import helmet from "helmet";
 import * as http from "http";
@@ -12,6 +18,13 @@ import { router as partitionsRouter } from "./partitions";
 import { router as schemaRouter } from "./schema";
 import { router as cronsRouter } from "./crons";
 import { pubSub } from "./events";
+import {
+  TransformResInterceptor,
+  LoggingInterceptor,
+  ErrorInterceptor,
+} from "./interceptors";
+
+const PLUGIN_DEV = process.env?.PLUGIN_DEV;
 
 const app = express();
 const PORT = 3000;
@@ -35,70 +48,60 @@ app.use(
 );
 app.use(express.json({ limit: "150MB" }));
 
-const router = express.Router();
-
-const getDirectories = (
-  src: string,
-  callback: (err: Error, res: [string]) => void
-) => {
-  glob(src + "/**/*", callback);
-};
-
-const cfgs: any = [];
+// TransformResInterceptor
+app.use(TransformResInterceptor);
+// LoggingInterceptor
+app.use(LoggingInterceptor);
 
-// export const pubSub = new PubSub();
+const router = express.Router();
 
+// Init WebSocket server event listener
 io.on("connection", (socket: Socket) => {
   console.log("socket.io connected");
-  socket.emit("greeting-from-server", {
-    greeting: "Hello Client",
-  });
-  socket.on("greeting-from-client", (message) => {
-    console.log(message);
-  });
+  // socket.emit("greeting-from-server", {
+  //   greeting: "Hello Client",
+  // });
+  // socket.on("greeting-from-client", (message) => {
+  //   console.log(message);
+  // });
   socket.on("COLLECTION", (message: any) => {
-    console.log("received COLLECTION: %s", message);
+    // console.log("received COLLECTION: %s", message);
     socket.emit("COLLECTION", { data: message });
   });
   socket.on("events", (message: any) => {
-    console.log("received events: %s", message);
+    // console.log("received events: %s", message);
     const response = [1, 2, 3];
     response.map((item) => {
       setImmediate(() => socket.emit("events", { data: item }));
     });
   });
   socket.on("identity", (message: any) => {
-    console.log("received identity: %s", message);
+    // console.log("received identity: %s", message);
     socket.emit("identity", `identity data: ${message}`);
   });
   pubSub.on("ws_pubsub", (msg) => {
     const { event, data } = msg;
-    console.log(`pubsub: ${event}`);
+    // console.log(`pubsub: ${event}`);
     socket.emit(event, data);
   });
 });
 
-// const customInterceptor = (request: any, response: any, next: any) => {
-//   const oldSend = response.send;
-//   response.send = (data: any) => {
-//     // arguments[0] (or `data`) contains the response body
-//     // arguments[0] = "modified : " + arguments[0];
-//     console.log(data);
-//     const newData = {...data, test: "customInterceptor"}
-//     console.log(newData);
-//     oldSend.call(response, newData);
-//   };
-//   next();
-// };
-// app.use(customInterceptor);
+// Utils: read files under specified directories
+const getDirectories = (
+  src: string,
+  callback: (err: Error, res: [string]) => void
+) => {
+  glob(src + "/**/*", callback);
+};
 
-getDirectories("../../src", (err: Error, res: [string]) => {
-  if (err) {
-    console.log("Error", err);
+// Read plugin files and start express server
+getDirectories("../../src", (dirErr: Error, dirRes: [string]) => {
+  const cfgs: any = [];
+  if (dirErr) {
+    console.log("Reading plugin directory Error", dirErr);
   } else {
-    res.forEach((item: string) => {
+    dirRes.forEach((item: string) => {
       console.log(item);
-      // if (path.extname(item) === ".json") {
       if (item.endsWith("/config.json")) {
         const fileData = fs.readFileSync(item);
         const json = JSON.parse(fileData.toString());
@@ -120,13 +123,12 @@ getDirectories("../../src", (err: Error, res: [string]) => {
       data: { api: pluginPath },
     } = cfg;
     if (!pluginPath) return;
-    // const pluginRouter = require(`${dir}/server/app`);
     const {
       default: { router: pluginRouter },
     } = await import(`../${dir}/server/app`);
     console.log(pluginPath);
     console.log(pluginRouter);
-    app.use(`/${pluginPath}`, pluginRouter);
+    router.use(`/${pluginPath}`, pluginRouter);
   });
 
   router.use("/milvus", connectRouter);
@@ -135,17 +137,14 @@ getDirectories("../../src", (err: Error, res: [string]) => {
   router.use("/schema", schemaRouter);
   router.use("/crons", cronsRouter);
 
-  router.get("/healthy", (request, response) => {
-    response.json({ status: 200 });
+  router.get("/healthy", (req, res, next) => {
+    res.json({ status: 200 });
+    next();
   });
 
   app.use("/api/v1", router);
-  app.all("/socket.io/", (request, response) => {
-    response.send("ok");
-  });
-  // app.listen(PORT, () => {
-  //   console.log(`Example app listening at http://localhost:${PORT}`);
-  // });
+  // ErrorInterceptor
+  app.use(ErrorInterceptor);
   // start server
   server.listen(PORT, () => {
     console.log(`Server started on port ${PORT} :)`);

+ 33 - 33
express/src/collections/index.ts

@@ -6,101 +6,101 @@ const router = express.Router();
 
 export const collectionsService = new CollectionsService(milvusService);
 
-router.get("/", async (req, res) => {
+router.get("/", async (req, res, next) => {
   const type = parseInt("" + req.query?.type, 10);
   try {
     const result =
       type === 1
         ? await collectionsService.getLoadedColletions()
         : await collectionsService.getAllCollections();
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/statistics", async (req, res) => {
+router.get("/statistics", async (req, res, next) => {
   try {
     const result = await collectionsService.getStatistics();
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.post("/", async (req, res) => {
+router.post("/", async (req, res, next) => {
   const createCollectionData = req.body;
   try {
     const result = await collectionsService.createCollection(
       createCollectionData
     );
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.delete("/:name", async (req, res) => {
+router.delete("/:name", async (req, res, next) => {
   const name = req.params?.name;
   try {
     const result = await collectionsService.dropCollection({
       collection_name: name,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/:name", async (req, res) => {
+router.get("/:name", async (req, res, next) => {
   const name = req.params?.name;
   try {
     const result = await collectionsService.describeCollection({
       collection_name: name,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/:name/statistics", async (req, res) => {
+router.get("/:name/statistics", async (req, res, next) => {
   const name = req.params?.name;
   try {
     const result = await collectionsService.getCollectionStatistics({
       collection_name: name,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/indexes/status", async (req, res) => {
+router.get("/indexes/status", async (req, res, next) => {
   try {
     const result = await collectionsService.getCollectionsIndexStatus();
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.put("/:name/load", async (req, res) => {
+router.put("/:name/load", async (req, res, next) => {
   const name = req.params?.name;
   try {
     const result = await collectionsService.loadCollection({
       collection_name: name,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.put("/:name/release", async (req, res) => {
+router.put("/:name/release", async (req, res, next) => {
   const name = req.params?.name;
   try {
     const result = await collectionsService.releaseCollection({
       collection_name: name,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.post("/:name/insert", async (req, res) => {
+router.post("/:name/insert", async (req, res, next) => {
   const name = req.params?.name;
   const data = req.body;
   try {
@@ -108,12 +108,12 @@ router.post("/:name/insert", async (req, res) => {
       collection_name: name,
       ...data,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.post("/:name/search", async (req, res) => {
+router.post("/:name/search", async (req, res, next) => {
   const name = req.params?.name;
   const data = req.body;
   try {
@@ -121,9 +121,9 @@ router.post("/:name/search", async (req, res) => {
       collection_name: name,
       ...data,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
 

+ 16 - 12
express/src/connect/index.ts

@@ -5,36 +5,40 @@ const router = express.Router();
 
 const milvusService = new MilvusService();
 
-router.post("/connect", async (req, res) => {
+router.post("/connect", async (req, res, next) => {
   const address = req.body?.address;
   try {
     const result = await milvusService.connectMilvus(address);
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/check", async (req, res) => {
+router.get("/check", async (req, res, next) => {
   const address = "" + req.query?.address;
   try {
     const result = await milvusService.checkConnect(address);
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.put("/flush", async (req, res) => {
+router.put("/flush", async (req, res, next) => {
   const collectionNames = req.body;
   try {
     const result = await milvusService.flush(collectionNames);
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/metrics", async (req, res) => {
-  const result = await milvusService.getMetrics();
-  res.send({ data: result, statusCode: 200 });
+router.get("/metrics", async (req, res, next) => {
+  try {
+    const result = await milvusService.getMetrics();
+    res.send(result);
+  } catch (error) {
+    next(error);
+  }
 });
 
 export { router, milvusService };

+ 8 - 7
express/src/crons/index.ts

@@ -6,15 +6,16 @@ const router = express.Router();
 
 const schedulerRegistry = new SchedulerRegistry([]);
 
-const cronsService = new CronsService(
-  collectionsService,
-  schedulerRegistry
-);
+const cronsService = new CronsService(collectionsService, schedulerRegistry);
 
-router.put("/", async (req, res) => {
+router.put("/", async (req, res, next) => {
   const cronData = req.body;
-  const result = await cronsService.toggleCronJobByName(cronData)
-  res.send(result);
+  try {
+    const result = await cronsService.toggleCronJobByName(cronData);
+    res.send(result);
+  } catch (error) {
+    next(error);
+  }
 });
 
 export { router };

+ 77 - 0
express/src/interceptors/index.ts

@@ -0,0 +1,77 @@
+import { Request, Response, NextFunction, Errback } from "express";
+
+// TransformResInterceptor
+export const TransformResInterceptor = (
+  req: Request,
+  res: Response,
+  next: NextFunction
+) => {
+  const oldSend = res.send;
+  res.send = (data) => {
+    // console.log(data); // do something with the data
+    res.send = oldSend; // set function back to avoid the 'double-send'
+    return res.send({ data });
+    // return res.send({ data, statusCode: 200 }); // just call as normal with data
+  };
+  next();
+};
+
+const getDurationInMilliseconds = (start: any) => {
+  const NS_PER_SEC = 1e9;
+  const NS_TO_MS = 1e6;
+  const diff = process.hrtime(start);
+
+  return (diff[0] * NS_PER_SEC + diff[1]) / NS_TO_MS;
+};
+
+/**
+ * Add spent time looger when accessing milvus.
+ */
+export const LoggingInterceptor = (
+  req: Request,
+  res: Response,
+  next: NextFunction
+) => {
+  console.log(`${req.method} ${req.originalUrl} [STARTED]`);
+  const start = process.hrtime();
+  const { ip = "", method = "", originalUrl = "", headers = {} } = req;
+  const ua = headers["user-agent"] || "";
+
+  res.on("finish", () => {
+    const durationInMilliseconds = getDurationInMilliseconds(start);
+    console.log(
+      `${req.method} ${
+        req.originalUrl
+      } [FINISHED] ${durationInMilliseconds.toLocaleString()} ms`
+    );
+  });
+
+  res.on("close", () => {
+    const durationInMilliseconds = getDurationInMilliseconds(start);
+    const { statusCode = "" } = res;
+    console.log(
+      `${req.method} ${
+        req.originalUrl
+      } [CLOSED] ${durationInMilliseconds.toLocaleString()} ms ip:${ip} ua:${ua} status:${statusCode}`
+    );
+  });
+
+  next();
+};
+
+/**
+ * Handle error in here.
+ * Normally depend on status which from milvus service.
+ */
+export const ErrorInterceptor = (
+  err: Error,
+  req: Request,
+  res: Response,
+  next: NextFunction
+) => {
+  console.log("---error interceptor---\n%s", err);
+  if (res.headersSent) {
+    return next(err);
+  }
+  res.status(500).json({ error: err });
+};

+ 12 - 12
express/src/partitions/index.ts

@@ -6,45 +6,45 @@ const router = express.Router();
 
 const partitionsService = new PartitionsService(milvusService);
 
-router.get("/", async (req, res) => {
+router.get("/", async (req, res, next) => {
   const collectionName = "" + req.query?.collection_name;
   try {
     const result = await partitionsService.getPatitionsInfo({
       collection_name: collectionName,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.post("/", async (req, res) => {
+router.post("/", async (req, res, next) => {
   const { type, ...params } = req.body;
   try {
     const result =
       type.toLocaleLowerCase() === "create"
         ? await partitionsService.createParition(params)
         : await partitionsService.deleteParition(params);
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.put("/load", async (req, res) => {
+router.put("/load", async (req, res, next) => {
   const loadData = req.body;
   try {
     const result = await partitionsService.loadPartitions(loadData);
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.put("/release", async (req, res) => {
+router.put("/release", async (req, res, next) => {
   const loadData = req.body;
   try {
     const result = await partitionsService.releasePartitions(loadData);
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
 

+ 12 - 12
express/src/schema/index.ts

@@ -6,7 +6,7 @@ const router = express.Router();
 
 const schemaService = new SchemaService(milvusService);
 
-router.post("/index", async (req, res) => {
+router.post("/index", async (req, res, next) => {
   const { type, collection_name, extra_params, field_name } = req.body;
   try {
     const result =
@@ -17,38 +17,38 @@ router.post("/index", async (req, res) => {
             field_name,
           })
         : await schemaService.dropIndex({ collection_name, field_name });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/index", async (req, res) => {
+router.get("/index", async (req, res, next) => {
   const data = "" + req.query?.collection_name;
   try {
     const result = await schemaService.describeIndex({ collection_name: data });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/index/progress", async (req, res) => {
+router.get("/index/progress", async (req, res, next) => {
   const data = "" + req.query?.collection_name;
   try {
     const result = await schemaService.getIndexBuildProgress({
       collection_name: data,
     });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });
-router.get("/index/state", async (req, res) => {
+router.get("/index/state", async (req, res, next) => {
   const data = "" + req.query?.collection_name;
   try {
     const result = await schemaService.getIndexState({ collection_name: data });
-    res.send({ data: result, statusCode: 200 });
+    res.send(result);
   } catch (error) {
-    res.status(500).send({ error });
+    next(error);
   }
 });