123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import { CollectionsService } from '../collections/collections.service';
- import { WS_EVENTS, WS_EVENTS_TYPE } from '../utils/Const';
- import { schedule, ScheduledTask } from 'node-cron';
- import { pubSub } from '../events';
- export class CronsService {
- constructor(
- private collectionService: CollectionsService,
- private schedulerRegistry: SchedulerRegistry
- ) {}
- async toggleCronJobByName(data: {
- name: string;
- type: WS_EVENTS_TYPE;
- address: string;
- }) {
- const { name, type, address } = data;
- switch (name) {
- case WS_EVENTS.COLLECTION:
- const cronJobEntity = this.schedulerRegistry.getCronJob(name, address);
- if (!cronJobEntity && Number(type) === WS_EVENTS_TYPE.START) {
- return this.getCollections(WS_EVENTS.COLLECTION, address);
- }
- if (!cronJobEntity) {
- return;
- }
- return Number(type) === WS_EVENTS_TYPE.STOP
- ? cronJobEntity.stop()
- : cronJobEntity.start();
- default:
- throw new Error('Unsupported event type');
- }
- }
- async getCollections(name: string, address: string) {
- const task = async () => {
- try {
- const res = await this.collectionService.getAllCollections();
- // TODO
- // this.eventService.server.emit("COLLECTION", res);
- pubSub.emit('ws_pubsub', {
- event: WS_EVENTS.COLLECTION + '',
- data: res,
- });
- return res;
- } catch (error) {
- // When user not connect milvus, stop cron
- const cronJobEntity = this.schedulerRegistry.getCronJob(name, address);
- if (cronJobEntity) {
- cronJobEntity.stop();
- }
- throw new Error(error);
- }
- };
- this.schedulerRegistry.setCronJobEverySecond(name, task, address);
- }
- }
- export class SchedulerRegistry {
- constructor(private cronJobList: CronJob[]) {}
- getCronJob(name: string, address: string) {
- const target = this.cronJobList.find(
- item => item.name === name && item.address === address
- );
- return target?.entity;
- }
- setCronJobEverySecond(name: string, func: () => {}, address: string) {
- // The cron job will run every second
- this.setCronJob(name, '* * * * * *', func, address);
- }
- // ┌────────────── second (optional)
- // │ ┌──────────── minute
- // │ │ ┌────────── hour
- // │ │ │ ┌──────── day of month
- // │ │ │ │ ┌────── month
- // │ │ │ │ │ ┌──── day of week
- // │ │ │ │ │ │
- // │ │ │ │ │ │
- // * * * * * *
- // https://www.npmjs.com/package/node-cron
- setCronJob(name: string, scheduler: string, func: () => {}, address: string) {
- const target = this.cronJobList.find(
- item => item.name === name && item.address === address
- );
- if (target) {
- target?.entity?.stop();
- } else {
- const task = schedule(scheduler, () => {
- console.log(`[Scheduler:${scheduler}] ${name}: running a task.`);
- func();
- });
- this.cronJobList.push({
- name,
- entity: task,
- address,
- });
- }
- }
- }
- interface CronJob {
- name: string;
- entity: ScheduledTask;
- address: string; // milvus address
- }
|