Browse Source

Merge pull request #226 from nameczz/dev

Use websocket to update index and loading status.
ryjiang 3 years ago
parent
commit
93d9589fb1

+ 6 - 3
client/src/App.tsx

@@ -2,14 +2,17 @@ import Router from './router/Router';
 import { RootProvider } from './context/Root';
 import { RootProvider } from './context/Root';
 import { NavProvider } from './context/Navigation';
 import { NavProvider } from './context/Navigation';
 import { AuthProvider } from './context/Auth';
 import { AuthProvider } from './context/Auth';
+import { WebSocketProvider } from './context/WebSocket';
 
 
 function App() {
 function App() {
   return (
   return (
     <RootProvider>
     <RootProvider>
       <AuthProvider>
       <AuthProvider>
-        <NavProvider>
-          <Router></Router>
-        </NavProvider>
+        <WebSocketProvider>
+          <NavProvider>
+            <Router></Router>
+          </NavProvider>
+        </WebSocketProvider>
       </AuthProvider>
       </AuthProvider>
     </RootProvider>
     </RootProvider>
   );
   );

+ 9 - 0
client/src/consts/Http.ts

@@ -4,3 +4,12 @@ export enum CODE_STATUS {
 }
 }
 
 
 export const START_LOADING_TIME = 350;
 export const START_LOADING_TIME = 350;
+
+export enum WS_EVENTS {
+  COLLECTION = 'COLLECTION',
+}
+
+export enum WS_EVENTS_TYPE {
+  START,
+  STOP,
+}

+ 6 - 0
client/src/context/Types.ts

@@ -1,4 +1,5 @@
 import { Dispatch, ReactElement, SetStateAction } from 'react';
 import { Dispatch, ReactElement, SetStateAction } from 'react';
+import { CollectionView } from '../pages/collections/Types';
 import { NavInfo } from '../router/Types';
 import { NavInfo } from '../router/Types';
 
 
 export type RootContextType = {
 export type RootContextType = {
@@ -62,3 +63,8 @@ export type NavContextType = {
   navInfo: NavInfo;
   navInfo: NavInfo;
   setNavInfo: (param: NavInfo) => void;
   setNavInfo: (param: NavInfo) => void;
 };
 };
+
+export type WebSocketType = {
+  collections: CollectionView[];
+  setCollections: (data: CollectionView[]) => void;
+};

+ 65 - 0
client/src/context/WebSocket.tsx

@@ -0,0 +1,65 @@
+import { createContext, useEffect, useState } from 'react';
+import { io } from 'socket.io-client';
+import { WS_EVENTS, WS_EVENTS_TYPE } from '../consts/Http';
+import { url } from '../http/Axios';
+import { CollectionHttp } from '../http/Collection';
+import { MilvusHttp } from '../http/Milvus';
+import { CollectionView } from '../pages/collections/Types';
+import { checkIndexBuilding, checkLoading } from '../utils/Validation';
+import { WebSocketType } from './Types';
+
+export const webSokcetContext = createContext<WebSocketType>({
+  collections: [],
+  setCollections: data => {},
+});
+
+const { Provider } = webSokcetContext;
+
+export const WebSocketProvider = (props: { children: React.ReactNode }) => {
+  const [collections, setCollections] = useState<CollectionView[]>([]);
+
+  // test code for socket
+  useEffect(() => {
+    const socket = io(url);
+
+    socket.on('connect', function () {
+      console.log('--- ws connected ---');
+    });
+
+    /**
+     * Because of collections data may be big, so we still use ajax to fetch data.
+     * Only when collection list includes index building or loading collection,
+     * server will keep push collections data from milvus every seconds.
+     * After all collections are not loading or building index, tell server stop pulling data.
+     */
+    socket.on(WS_EVENTS.COLLECTION, (data: any) => {
+      const collections: CollectionHttp[] = data.map(
+        (v: any) => new CollectionHttp(v)
+      );
+
+      const hasLoadingOrBuildingCollection = collections.some(
+        v => checkLoading(v) || checkIndexBuilding(v)
+      );
+
+      setCollections(collections);
+      // If no collection is building index or loading collection
+      // stop server cron job
+      if (!hasLoadingOrBuildingCollection) {
+        MilvusHttp.triggerCron({
+          name: WS_EVENTS.COLLECTION,
+          type: WS_EVENTS_TYPE.STOP,
+        });
+      }
+    });
+  }, []);
+  return (
+    <Provider
+      value={{
+        collections,
+        setCollections,
+      }}
+    >
+      {props.children}
+    </Provider>
+  );
+};

+ 9 - 0
client/src/http/Milvus.ts

@@ -1,9 +1,11 @@
+import { WS_EVENTS, WS_EVENTS_TYPE } from '../consts/Http';
 import BaseModel from './BaseModel';
 import BaseModel from './BaseModel';
 
 
 export class MilvusHttp extends BaseModel {
 export class MilvusHttp extends BaseModel {
   static CONNECT_URL = '/milvus/connect';
   static CONNECT_URL = '/milvus/connect';
   static CHECK_URL = '/milvus/check';
   static CHECK_URL = '/milvus/check';
   static FLUSH_URL = '/milvus/flush';
   static FLUSH_URL = '/milvus/flush';
+  static TIGGER_CRON_URL = '/crons';
 
 
   constructor(props: {}) {
   constructor(props: {}) {
     super(props);
     super(props);
@@ -26,4 +28,11 @@ export class MilvusHttp extends BaseModel {
       },
       },
     });
     });
   }
   }
+
+  static triggerCron(data: { name: WS_EVENTS; type: WS_EVENTS_TYPE }) {
+    return super.update({
+      path: this.TIGGER_CRON_URL,
+      data,
+    });
+  }
 }
 }

+ 65 - 80
client/src/pages/collections/Collections.tsx

@@ -1,4 +1,4 @@
-import { useCallback, useContext, useEffect, useState } from 'react';
+import { useCallback, useContext, useEffect, useMemo, useState } from 'react';
 import { Link } from 'react-router-dom';
 import { Link } from 'react-router-dom';
 import { useNavigationHook } from '../../hooks/Navigation';
 import { useNavigationHook } from '../../hooks/Navigation';
 import { ALL_ROUTER_TYPES } from '../../router/Types';
 import { ALL_ROUTER_TYPES } from '../../router/Types';
@@ -33,6 +33,9 @@ import { parseLocationSearch } from '../../utils/Format';
 import InsertContainer from '../../components/insert/Container';
 import InsertContainer from '../../components/insert/Container';
 import { MilvusHttp } from '../../http/Milvus';
 import { MilvusHttp } from '../../http/Milvus';
 import { LOADING_STATE } from '../../consts/Milvus';
 import { LOADING_STATE } from '../../consts/Milvus';
+import { webSokcetContext } from '../../context/WebSocket';
+import { WS_EVENTS, WS_EVENTS_TYPE } from '../../consts/Http';
+import { checkIndexBuilding, checkLoading } from '../../utils/Validation';
 
 
 const useStyles = makeStyles((theme: Theme) => ({
 const useStyles = makeStyles((theme: Theme) => ({
   emptyWrapper: {
   emptyWrapper: {
@@ -59,88 +62,95 @@ const useStyles = makeStyles((theme: Theme) => ({
 
 
 let timer: NodeJS.Timeout | null = null;
 let timer: NodeJS.Timeout | null = null;
 // get init search value from url
 // get init search value from url
-const { search = '' } = parseLocationSearch(window.location.search);
+const { urlSearch = '' } = parseLocationSearch(window.location.search);
 
 
 const Collections = () => {
 const Collections = () => {
   useNavigationHook(ALL_ROUTER_TYPES.COLLECTIONS);
   useNavigationHook(ALL_ROUTER_TYPES.COLLECTIONS);
   const { handleAction } = useLoadAndReleaseDialogHook({ type: 'collection' });
   const { handleAction } = useLoadAndReleaseDialogHook({ type: 'collection' });
   const { handleInsertDialog } = useInsertDialogHook();
   const { handleInsertDialog } = useInsertDialogHook();
-  const [collections, setCollections] = useState<CollectionView[]>([]);
 
 
-  const [searchedCollections, setSearchedCollections] = useState<
-    CollectionView[]
-  >([]);
-  const {
-    pageSize,
-    handlePageSize,
-    currentPage,
-    handleCurrentPage,
-    total,
-    data: collectionList,
-    handleGridSort,
-    order,
-    orderBy,
-  } = usePaginationHook(searchedCollections);
-  const [loading, setLoading] = useState<boolean>(true);
+  const [search, setSearch] = useState<string>(urlSearch);
+  const [loading, setLoading] = useState<boolean>(false);
   const [selectedCollections, setSelectedCollections] = useState<
   const [selectedCollections, setSelectedCollections] = useState<
     CollectionView[]
     CollectionView[]
   >([]);
   >([]);
 
 
   const { setDialog, handleCloseDialog, openSnackBar } =
   const { setDialog, handleCloseDialog, openSnackBar } =
     useContext(rootContext);
     useContext(rootContext);
+  const { collections, setCollections } = useContext(webSokcetContext);
   const { t: collectionTrans } = useTranslation('collection');
   const { t: collectionTrans } = useTranslation('collection');
   const { t: btnTrans } = useTranslation('btn');
   const { t: btnTrans } = useTranslation('btn');
   const { t: dialogTrans } = useTranslation('dialog');
   const { t: dialogTrans } = useTranslation('dialog');
   const { t: successTrans } = useTranslation('success');
   const { t: successTrans } = useTranslation('success');
-
   const classes = useStyles();
   const classes = useStyles();
 
 
   const LoadIcon = icons.load;
   const LoadIcon = icons.load;
   const ReleaseIcon = icons.release;
   const ReleaseIcon = icons.release;
   const InfoIcon = icons.info;
   const InfoIcon = icons.info;
 
 
-  const fetchData = useCallback(async () => {
-    try {
-      const res = await CollectionHttp.getCollections();
-      const statusRes = await CollectionHttp.getCollectionsIndexState();
-      setLoading(false);
+  const searchedCollections = useMemo(
+    () => collections.filter(collection => collection._name.includes(search)),
+    [collections, search]
+  );
 
 
-      const collections = res.map(v => {
-        const indexStatus = statusRes.find(item => item._name === v._name);
-        Object.assign(v, {
-          nameElement: (
-            <Link to={`/collections/${v._name}`} className={classes.link}>
-              <Highlighter
-                textToHighlight={v._name}
-                searchWords={[search]}
-                highlightClassName={classes.highlight}
-              />
-            </Link>
-          ),
-          statusElement: (
-            <Status status={v._status} percentage={v._loadedPercentage} />
-          ),
-          indexCreatingElement: (
-            <StatusIcon
-              type={indexStatus?._indexState || ChildrenStatusType.FINISH}
+  const formatCollections = useMemo(() => {
+    const data = searchedCollections.map(v => {
+      // const indexStatus = statusRes.find(item => item._name === v._name);
+      Object.assign(v, {
+        nameElement: (
+          <Link to={`/collections/${v._name}`} className={classes.link}>
+            <Highlighter
+              textToHighlight={v._name}
+              searchWords={[search]}
+              highlightClassName={classes.highlight}
             />
             />
-          ),
-        });
-
-        return v;
+          </Link>
+        ),
+        statusElement: (
+          <Status status={v._status} percentage={v._loadedPercentage} />
+        ),
+        indexCreatingElement: (
+          <StatusIcon type={v._indexState || ChildrenStatusType.FINISH} />
+        ),
       });
       });
 
 
-      // filter collection if url contains search param
-      const filteredCollections = collections.filter(collection =>
-        collection._name.includes(search)
+      return v;
+    });
+    return data;
+  }, [classes.highlight, classes.link, search, searchedCollections]);
+
+  const {
+    pageSize,
+    handlePageSize,
+    currentPage,
+    handleCurrentPage,
+    total,
+    data: collectionList,
+    handleGridSort,
+    order,
+    orderBy,
+  } = usePaginationHook(formatCollections);
+
+  const fetchData = useCallback(async () => {
+    try {
+      setLoading(true);
+      const res = await CollectionHttp.getCollections();
+      const hasLoadingOrBuildingCollection = res.some(
+        v => checkLoading(v) || checkIndexBuilding(v)
       );
       );
+      // if some collection is building index or loading, start pulling data
+      if (hasLoadingOrBuildingCollection) {
+        MilvusHttp.triggerCron({
+          name: WS_EVENTS.COLLECTION,
+          type: WS_EVENTS_TYPE.START,
+        });
+      }
 
 
-      setCollections(collections);
-      setSearchedCollections(filteredCollections);
-    } catch (err) {
+      setCollections(res);
+    } finally {
       setLoading(false);
       setLoading(false);
     }
     }
-  }, [classes.link, classes.highlight]);
+  }, [setCollections]);
 
 
   useEffect(() => {
   useEffect(() => {
     fetchData();
     fetchData();
@@ -226,32 +236,7 @@ const Collections = () => {
     if (timer) {
     if (timer) {
       clearTimeout(timer);
       clearTimeout(timer);
     }
     }
-    // add loading manually
-    setLoading(true);
-    timer = setTimeout(() => {
-      const searchWords = [value];
-      const list = value
-        ? collections.filter(c => c._name.includes(value))
-        : collections;
-
-      const highlightList = list.map(c => {
-        Object.assign(c, {
-          nameElement: (
-            <Link to={`/collections/${c._name}`} className={classes.link}>
-              <Highlighter
-                textToHighlight={c._name}
-                searchWords={searchWords}
-                highlightClassName={classes.highlight}
-              />
-            </Link>
-          ),
-        });
-        return c;
-      });
-
-      setLoading(false);
-      setSearchedCollections(highlightList);
-    }, 300);
+    setSearch(value);
   };
   };
 
 
   const toolbarConfigs: ToolBarConfig[] = [
   const toolbarConfigs: ToolBarConfig[] = [
@@ -275,7 +260,7 @@ const Collections = () => {
       onClick: () => {
       onClick: () => {
         handleInsertDialog(
         handleInsertDialog(
           <InsertContainer
           <InsertContainer
-            collections={collections}
+            collections={formatCollections}
             defaultSelectedCollection={
             defaultSelectedCollection={
               selectedCollections.length === 1
               selectedCollections.length === 1
                 ? selectedCollections[0]._name
                 ? selectedCollections[0]._name

+ 0 - 28
client/src/pages/connect/Connect.tsx

@@ -14,7 +14,6 @@ import { MilvusHttp } from '../../http/Milvus';
 import { rootContext } from '../../context/Root';
 import { rootContext } from '../../context/Root';
 import { MILVUS_ADDRESS } from '../../consts/Localstorage';
 import { MILVUS_ADDRESS } from '../../consts/Localstorage';
 import { formatAddress } from '../../utils/Format';
 import { formatAddress } from '../../utils/Format';
-// import { io } from "socket.io-client";
 
 
 const useStyles = makeStyles((theme: Theme) => ({
 const useStyles = makeStyles((theme: Theme) => ({
   wrapper: {
   wrapper: {
@@ -100,33 +99,6 @@ const Connect = () => {
     defaultValue: form.address,
     defaultValue: form.address,
   };
   };
 
 
-  // test code for socket
-  // useEffect(() => {
-  //   const socket = io('http://localhost:3002');
-  //   socket.on('connect', function () {
-  //     console.log('Connected');
-
-  //     socket.emit('identity', 0, (res: any) =>
-  //       console.log(res));
-
-  //     socket.emit('events', { test: 'events' });
-
-  //     socket.emit('senddata', { test: 'senddata' });
-  //   });
-  //   socket.on('events', (data: any) => {
-  //     console.log('event', data);
-  //   });
-  //   socket.on('senddata', (data: any) => {
-  //     console.log('senddata', data);
-  //   });
-  //   socket.on('exception', (data: any) => {
-  //     console.log('event', data);
-  //   });
-  //   socket.on('disconnect', () => {
-  //     console.log('Disconnected');
-  //   });
-  // }, []);
-
   return (
   return (
     <ConnectContainer>
     <ConnectContainer>
       <section className={classes.wrapper}>
       <section className={classes.wrapper}>

+ 15 - 0
client/src/utils/Validation.ts

@@ -1,3 +1,4 @@
+import { ChildrenStatusType } from '../components/status/Types';
 import { MetricType, METRIC_TYPES_VALUES } from '../consts/Milvus';
 import { MetricType, METRIC_TYPES_VALUES } from '../consts/Milvus';
 
 
 export type ValidType =
 export type ValidType =
@@ -243,3 +244,17 @@ export const getCheckResult = (param: ICheckMapParam): boolean => {
 
 
   return checkMap[rule];
   return checkMap[rule];
 };
 };
+
+/**
+ * Check collection is loading or not
+ */
+export const checkLoading = (v: any): boolean =>
+  v._loadedPercentage !== '-1' && v._loadedPercentage !== '100';
+
+/**
+ * Check collection is index building or not.
+ * @param v
+ * @returns boolean
+ */
+export const checkIndexBuilding = (v: any): boolean =>
+  v._indexState === ChildrenStatusType.CREATING;

+ 1 - 1
server/generate-csv.ts

@@ -18,7 +18,7 @@ const generateVector = (dimension) => {
   return JSON.stringify(vectors);
   return JSON.stringify(vectors);
 };
 };
 
 
-while (records.length < 3000) {
+while (records.length < 50000) {
   const value = generateVector(4);
   const value = generateVector(4);
   records.push({ vector: value });
   records.push({ vector: value });
 }
 }

+ 1 - 0
server/package.json

@@ -27,6 +27,7 @@
     "@nestjs/passport": "^8.0.0",
     "@nestjs/passport": "^8.0.0",
     "@nestjs/platform-express": "^8.0.4",
     "@nestjs/platform-express": "^8.0.4",
     "@nestjs/platform-socket.io": "^8.0.4",
     "@nestjs/platform-socket.io": "^8.0.4",
+    "@nestjs/schedule": "^1.0.1",
     "@nestjs/serve-static": "^2.2.2",
     "@nestjs/serve-static": "^2.2.2",
     "@nestjs/swagger": "^5.0.8",
     "@nestjs/swagger": "^5.0.8",
     "@nestjs/websockets": "^8.0.4",
     "@nestjs/websockets": "^8.0.4",

+ 4 - 0
server/src/app.module.ts

@@ -11,6 +11,8 @@ import { PartitionsModule } from './partitions/partitions.module';
 import { SchemaModule } from './schema/schema.module';
 import { SchemaModule } from './schema/schema.module';
 import { EventsModule } from './events/events.module';
 import { EventsModule } from './events/events.module';
 import { LoggingInterceptor } from './interceptors/index';
 import { LoggingInterceptor } from './interceptors/index';
+import { CronsModule } from './crons/crons.module';
+import { ScheduleModule } from '@nestjs/schedule';
 
 
 @Module({
 @Module({
   imports: [
   imports: [
@@ -30,6 +32,8 @@ import { LoggingInterceptor } from './interceptors/index';
     SchemaModule,
     SchemaModule,
     // used for events communication
     // used for events communication
     EventsModule,
     EventsModule,
+    CronsModule,
+    ScheduleModule.forRoot(),
   ],
   ],
   controllers: [AppController],
   controllers: [AppController],
   providers: [
   providers: [

+ 3 - 3
server/src/collections/collections.controller.ts

@@ -11,8 +11,8 @@ import {
   ValidationPipe,
   ValidationPipe,
   // CACHE_MANAGER,
   // CACHE_MANAGER,
   // Inject,
   // Inject,
-  UseInterceptors,
-  CacheInterceptor,
+  // UseInterceptors,
+  // CacheInterceptor,
 } from '@nestjs/common';
 } from '@nestjs/common';
 // import { Cache } from 'cache-manager';
 // import { Cache } from 'cache-manager';
 import { ApiTags } from '@nestjs/swagger';
 import { ApiTags } from '@nestjs/swagger';
@@ -64,7 +64,7 @@ export class CollectionsController {
 
 
   // use interceptor to control cache automatically
   // use interceptor to control cache automatically
   @Get('statistics')
   @Get('statistics')
-  @UseInterceptors(CacheInterceptor)
+  // @UseInterceptors(CacheInterceptor)
   async getStatistics() {
   async getStatistics() {
     return await this.collectionsService.getStatistics();
     return await this.collectionsService.getStatistics();
   }
   }

+ 2 - 1
server/src/collections/collections.module.ts

@@ -13,5 +13,6 @@ import { ttl } from '../cache/config';
   ],
   ],
   providers: [CollectionsService],
   providers: [CollectionsService],
   controllers: [CollectionsController],
   controllers: [CollectionsController],
+  exports: [CollectionsService],
 })
 })
-export class CollectionsModule { }
+export class CollectionsModule {}

+ 12 - 2
server/src/collections/collections.service.ts

@@ -113,15 +113,23 @@ export class CollectionsService {
     if (res.data.length > 0) {
     if (res.data.length > 0) {
       for (const item of res.data) {
       for (const item of res.data) {
         const { name } = item;
         const { name } = item;
+
         const collectionInfo = await this.describeCollection({
         const collectionInfo = await this.describeCollection({
           collection_name: name,
           collection_name: name,
         });
         });
+
         const collectionStatistics = await this.getCollectionStatistics({
         const collectionStatistics = await this.getCollectionStatistics({
           collection_name: name,
           collection_name: name,
         });
         });
+
+        const indexRes = await this.getIndexStatus({
+          collection_name: item.name,
+        });
+
         const autoID = collectionInfo.schema.fields.find(
         const autoID = collectionInfo.schema.fields.find(
           (v) => v.is_primary_key === true,
           (v) => v.is_primary_key === true,
         )?.autoID;
         )?.autoID;
+
         const loadCollection = loadedCollections.data.find(
         const loadCollection = loadedCollections.data.find(
           (v) => v.name === name,
           (v) => v.name === name,
         );
         );
@@ -129,6 +137,7 @@ export class CollectionsService {
         const loadedPercentage = !loadCollection
         const loadedPercentage = !loadCollection
           ? '-1'
           ? '-1'
           : loadCollection.loadedPercentage;
           : loadCollection.loadedPercentage;
+
         data.push({
         data.push({
           collection_name: name,
           collection_name: name,
           schema: collectionInfo.schema,
           schema: collectionInfo.schema,
@@ -138,6 +147,7 @@ export class CollectionsService {
           id: collectionInfo.collectionID,
           id: collectionInfo.collectionID,
           loadedPercentage,
           loadedPercentage,
           createdTime: collectionInfo.created_utc_timestamp,
           createdTime: collectionInfo.created_utc_timestamp,
+          index_status: indexRes.state,
         });
         });
       }
       }
     }
     }
@@ -192,7 +202,7 @@ export class CollectionsService {
 
 
   /**
   /**
    * Get all collection index status
    * Get all collection index status
-   * @returns {collection_name:string, index_state: IndexState}[]
+   * @returns {collection_name:string, index_status: IndexState}[]
    */
    */
   async getCollectionsIndexStatus() {
   async getCollectionsIndexStatus() {
     const data = [];
     const data = [];
@@ -204,7 +214,7 @@ export class CollectionsService {
         });
         });
         data.push({
         data.push({
           collection_name: item.name,
           collection_name: item.name,
-          index_state: indexRes.state,
+          index_status: indexRes.state,
         });
         });
       }
       }
     }
     }

+ 18 - 0
server/src/crons/crons.controller.spec.ts

@@ -0,0 +1,18 @@
+import { Test, TestingModule } from '@nestjs/testing';
+import { CronsController } from './crons.controller';
+
+describe('CronsController', () => {
+  let controller: CronsController;
+
+  beforeEach(async () => {
+    const module: TestingModule = await Test.createTestingModule({
+      controllers: [CronsController],
+    }).compile();
+
+    controller = module.get<CronsController>(CronsController);
+  });
+
+  it('should be defined', () => {
+    expect(controller).toBeDefined();
+  });
+});

+ 13 - 0
server/src/crons/crons.controller.ts

@@ -0,0 +1,13 @@
+import { Body, Controller, Put } from '@nestjs/common';
+import { CronsService } from './crons.service';
+import { ToggleCron } from './dto';
+
+@Controller('crons')
+export class CronsController {
+  constructor(private cronsService: CronsService) {}
+
+  @Put()
+  async toggleCron(@Body() data: ToggleCron) {
+    return await this.cronsService.toggleCronJobByName(data);
+  }
+}

+ 12 - 0
server/src/crons/crons.module.ts

@@ -0,0 +1,12 @@
+import { Module } from '@nestjs/common';
+import { CollectionsModule } from 'src/collections/collections.module';
+import { EventsModule } from 'src/events/events.module';
+import { CronsService } from './crons.service';
+import { CronsController } from './crons.controller';
+
+@Module({
+  imports: [EventsModule, CollectionsModule],
+  providers: [CronsService],
+  controllers: [CronsController],
+})
+export class CronsModule {}

+ 18 - 0
server/src/crons/crons.service.spec.ts

@@ -0,0 +1,18 @@
+import { Test, TestingModule } from '@nestjs/testing';
+import { CronsService } from './crons.service';
+
+describe('CronsService', () => {
+  let service: CronsService;
+
+  beforeEach(async () => {
+    const module: TestingModule = await Test.createTestingModule({
+      providers: [CronsService],
+    }).compile();
+
+    service = module.get<CronsService>(CronsService);
+  });
+
+  it('should be defined', () => {
+    expect(service).toBeDefined();
+  });
+});

+ 28 - 0
server/src/crons/crons.service.ts

@@ -0,0 +1,28 @@
+import { Injectable } from '@nestjs/common';
+import { EventsGateway } from '../events/events.gateway';
+import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule';
+import { CollectionsService } from '../collections/collections.service';
+import { WS_EVENTS, WS_EVENTS_TYPE } from 'src/utils/Const';
+@Injectable()
+export class CronsService {
+  constructor(
+    private eventService: EventsGateway,
+    private collectionService: CollectionsService,
+    private schedulerRegistry: SchedulerRegistry,
+  ) {}
+
+  async toggleCronJobByName(data: { name: string; type: WS_EVENTS_TYPE }) {
+    const { name, type } = data;
+    const cronJob = this.schedulerRegistry.getCronJob(name);
+    return Number(type) === WS_EVENTS_TYPE.STOP
+      ? cronJob.stop()
+      : cronJob.start();
+  }
+
+  @Cron(CronExpression.EVERY_SECOND, { name: WS_EVENTS.COLLECTION })
+  async getCollections() {
+    const res = await this.collectionService.getAllCollections();
+    this.eventService.server.emit(WS_EVENTS.COLLECTION, res);
+    return res;
+  }
+}

+ 24 - 0
server/src/crons/dto.ts

@@ -0,0 +1,24 @@
+import { ApiProperty } from '@nestjs/swagger';
+import { IsEnum, IsNotEmpty, IsString } from 'class-validator';
+import { WS_EVENTS_TYPE } from '../utils/Const';
+
+export class ToggleCron {
+  @ApiProperty({
+    description: 'Cron job  name',
+  })
+  @IsString()
+  @IsNotEmpty({
+    message: 'Cron job name is empty',
+  })
+  readonly name: string;
+
+  @ApiProperty({
+    description: 'Type allow start->0 stop->1',
+    enum: WS_EVENTS_TYPE,
+  })
+  @IsEnum(WS_EVENTS_TYPE, { message: 'start -> 0, stop -> 1' })
+  @IsNotEmpty({
+    message: 'Toggle type is empty',
+  })
+  readonly type: WS_EVENTS_TYPE;
+}

+ 8 - 10
server/src/events/events.gateway.ts

@@ -8,21 +8,21 @@ import {
 import { from, Observable } from 'rxjs';
 import { from, Observable } from 'rxjs';
 import { map } from 'rxjs/operators';
 import { map } from 'rxjs/operators';
 import { Server } from 'socket.io';
 import { Server } from 'socket.io';
+import { WS_EVENTS } from 'src/utils/Const';
 
 
-@WebSocketGateway(3002, {
+@WebSocketGateway({
   cors: {
   cors: {
-    origin: "*",
-    methods: ["GET", "POST"],
-  }
+    origin: '*',
+    methods: ['GET', 'POST'],
+  },
 })
 })
 export class EventsGateway {
 export class EventsGateway {
   @WebSocketServer()
   @WebSocketServer()
   server: Server;
   server: Server;
 
 
-  @SubscribeMessage('senddata')
+  @SubscribeMessage(WS_EVENTS.COLLECTION)
   data(@MessageBody() data: unknown): WsResponse<unknown> {
   data(@MessageBody() data: unknown): WsResponse<unknown> {
-    const event = 'senddata';
-    return { event, data };
+    return { event: WS_EVENTS.COLLECTION + '', data };
   }
   }
 
 
   @SubscribeMessage('events')
   @SubscribeMessage('events')
@@ -30,9 +30,7 @@ export class EventsGateway {
     const event = 'events';
     const event = 'events';
     const response = [1, 2, 3];
     const response = [1, 2, 3];
 
 
-    return from(response).pipe(
-      map(data => ({ event, data })),
-    );
+    return from(response).pipe(map((data) => ({ event, data })));
   }
   }
 
 
   @SubscribeMessage('identity')
   @SubscribeMessage('identity')

+ 1 - 0
server/src/events/events.module.ts

@@ -3,5 +3,6 @@ import { EventsGateway } from './events.gateway';
 
 
 @Module({
 @Module({
   providers: [EventsGateway],
   providers: [EventsGateway],
+  exports: [EventsGateway],
 })
 })
 export class EventsModule {}
 export class EventsModule {}

+ 9 - 0
server/src/utils/Const.ts

@@ -5,3 +5,12 @@ export enum LOADING_STATE {
   LOADING,
   LOADING,
   UNLOADED,
   UNLOADED,
 }
 }
+
+export enum WS_EVENTS {
+  COLLECTION = 'COLLECTION',
+}
+
+export enum WS_EVENTS_TYPE {
+  START,
+  STOP,
+}

+ 27 - 0
server/yarn.lock

@@ -741,6 +741,14 @@
     socket.io "4.1.3"
     socket.io "4.1.3"
     tslib "2.3.0"
     tslib "2.3.0"
 
 
+"@nestjs/schedule@^1.0.1":
+  version "1.0.1"
+  resolved "https://registry.yarnpkg.com/@nestjs/schedule/-/schedule-1.0.1.tgz#4ab3f753c2a0f606cf969b0ec7aa4e655ea444d3"
+  integrity sha512-EU2tB4rxuEgum8JlorAFvXkU982EYZm/IBa7n6kgkyps5BbxQSFf7iR1CLkP9zODO9ApZTWk5z3q9L3O7vrkoQ==
+  dependencies:
+    cron "1.7.2"
+    uuid "8.3.2"
+
 "@nestjs/schematics@^8.0.0", "@nestjs/schematics@^8.0.2":
 "@nestjs/schematics@^8.0.0", "@nestjs/schematics@^8.0.2":
   version "8.0.2"
   version "8.0.2"
   resolved "https://registry.yarnpkg.com/@nestjs/schematics/-/schematics-8.0.2.tgz#ff0d92fcdfd885e66f69132d4ebbd97e4ab56ef5"
   resolved "https://registry.yarnpkg.com/@nestjs/schematics/-/schematics-8.0.2.tgz#ff0d92fcdfd885e66f69132d4ebbd97e4ab56ef5"
@@ -2262,6 +2270,13 @@ create-require@^1.1.0:
   resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333"
   resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333"
   integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==
   integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==
 
 
+cron@1.7.2:
+  version "1.7.2"
+  resolved "https://registry.yarnpkg.com/cron/-/cron-1.7.2.tgz#2ea1f35c138a07edac2ac5af5084ed6fee5723db"
+  integrity sha512-+SaJ2OfeRvfQqwXQ2kgr0Y5pzBR/lijf5OpnnaruwWnmI799JfWr2jN2ItOV9s3A/+TFOt6mxvKzQq5F0Jp6VQ==
+  dependencies:
+    moment-timezone "^0.5.x"
+
 cross-spawn@^6.0.0:
 cross-spawn@^6.0.0:
   version "6.0.5"
   version "6.0.5"
   resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-6.0.5.tgz#4a5ec7c64dfae22c3a14124dbacdee846d80cbc4"
   resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-6.0.5.tgz#4a5ec7c64dfae22c3a14124dbacdee846d80cbc4"
@@ -4571,6 +4586,18 @@ mkdirp@^0.5.1:
   dependencies:
   dependencies:
     minimist "^1.2.5"
     minimist "^1.2.5"
 
 
+moment-timezone@^0.5.x:
+  version "0.5.33"
+  resolved "https://registry.yarnpkg.com/moment-timezone/-/moment-timezone-0.5.33.tgz#b252fd6bb57f341c9b59a5ab61a8e51a73bbd22c"
+  integrity sha512-PTc2vcT8K9J5/9rDEPe5czSIKgLoGsH8UNpA4qZTVw0Vd/Uz19geE9abbIOQKaAQFcnQ3v5YEXrbSc5BpshH+w==
+  dependencies:
+    moment ">= 2.9.0"
+
+"moment@>= 2.9.0":
+  version "2.29.1"
+  resolved "https://registry.yarnpkg.com/moment/-/moment-2.29.1.tgz#b2be769fa31940be9eeea6469c075e35006fa3d3"
+  integrity sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ==
+
 ms@2.0.0:
 ms@2.0.0:
   version "2.0.0"
   version "2.0.0"
   resolved "https://registry.yarnpkg.com/ms/-/ms-2.0.0.tgz#5608aeadfc00be6c2901df5f9861788de0d597c8"
   resolved "https://registry.yarnpkg.com/ms/-/ms-2.0.0.tgz#5608aeadfc00be6c2901df5f9861788de0d597c8"