crons.service.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import { CollectionsService } from '../collections/collections.service';
  2. import { WS_EVENTS, WS_EVENTS_TYPE } from '../utils/Const';
  3. import { schedule, ScheduledTask } from 'node-cron';
  4. import { pubSub } from '../events';
  5. export class CronsService {
  6. constructor(
  7. private collectionService: CollectionsService,
  8. private schedulerRegistry: SchedulerRegistry
  9. ) {}
  10. async toggleCronJobByName(data: {
  11. name: string;
  12. type: WS_EVENTS_TYPE;
  13. address: string;
  14. }) {
  15. const { name, type, address } = data;
  16. switch (name) {
  17. case WS_EVENTS.COLLECTION:
  18. const cronJobEntity = this.schedulerRegistry.getCronJob(name, address);
  19. if (!cronJobEntity && Number(type) === WS_EVENTS_TYPE.START) {
  20. return this.getCollections(WS_EVENTS.COLLECTION, address);
  21. }
  22. if (!cronJobEntity) {
  23. return;
  24. }
  25. return Number(type) === WS_EVENTS_TYPE.STOP
  26. ? cronJobEntity.stop()
  27. : cronJobEntity.start();
  28. default:
  29. throw new Error('Unsupported event type');
  30. }
  31. }
  32. async getCollections(name: string, address: string) {
  33. const task = async () => {
  34. try {
  35. const res = await this.collectionService.getAllCollections();
  36. // TODO
  37. // this.eventService.server.emit("COLLECTION", res);
  38. pubSub.emit('ws_pubsub', {
  39. event: WS_EVENTS.COLLECTION + '',
  40. data: res,
  41. });
  42. return res;
  43. } catch (error) {
  44. // When user not connect milvus, stop cron
  45. const cronJobEntity = this.schedulerRegistry.getCronJob(name, address);
  46. if (cronJobEntity) {
  47. cronJobEntity.stop();
  48. }
  49. throw new Error(error);
  50. }
  51. };
  52. this.schedulerRegistry.setCronJobEverySecond(name, task, address);
  53. }
  54. }
  55. export class SchedulerRegistry {
  56. constructor(private cronJobList: CronJob[]) {}
  57. getCronJob(name: string, address: string) {
  58. const target = this.cronJobList.find(
  59. item => item.name === name && item.address === address
  60. );
  61. return target?.entity;
  62. }
  63. setCronJobEverySecond(name: string, func: () => {}, address: string) {
  64. // The cron job will run every second
  65. this.setCronJob(name, '* * * * * *', func, address);
  66. }
  67. // ┌────────────── second (optional)
  68. // │ ┌──────────── minute
  69. // │ │ ┌────────── hour
  70. // │ │ │ ┌──────── day of month
  71. // │ │ │ │ ┌────── month
  72. // │ │ │ │ │ ┌──── day of week
  73. // │ │ │ │ │ │
  74. // │ │ │ │ │ │
  75. // * * * * * *
  76. // https://www.npmjs.com/package/node-cron
  77. setCronJob(name: string, scheduler: string, func: () => {}, address: string) {
  78. const target = this.cronJobList.find(
  79. item => item.name === name && item.address === address
  80. );
  81. if (target) {
  82. target?.entity?.stop();
  83. } else {
  84. const task = schedule(scheduler, () => {
  85. console.log(`[Scheduler:${scheduler}] ${name}: running a task.`);
  86. func();
  87. });
  88. this.cronJobList.push({
  89. name,
  90. entity: task,
  91. address,
  92. });
  93. }
  94. }
  95. }
  96. interface CronJob {
  97. name: string;
  98. entity: ScheduledTask;
  99. address: string; // milvus address
  100. }