123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- import { schedule, ScheduledTask } from 'node-cron';
- import { CollectionsService } from '../collections/collections.service';
- import {
- WS_EVENTS,
- WS_EVENTS_TYPE,
- checkLoading,
- checkIndexing,
- } from '../utils';
- import { clients } from '../socket';
- import { CronJobObject } from '../types';
- interface CronJob {
- id: string;
- clientId: string; // milvus milvusClientId
- task: ScheduledTask;
- data: CronJobObject;
- }
- import { clientCache } from '../app';
- const getId = (clientId: string, data: CronJobObject) => {
- return `${clientId}/${data.name}/${
- data.payload.database
- }/[${data.payload.collections.join('/')}]`;
- };
- export class CronsService {
- constructor(
- private collectionService: CollectionsService,
- private schedulerRegistry: SchedulerRegistry
- ) {}
- async toggleCronJobByName(clientId: string, data: CronJobObject) {
- const { name, type } = data;
- // define cronJob
- const cronJob: CronJob = this.schedulerRegistry.getCronJob(clientId, data);
- // if type is stop, stop cronJob
- if (cronJob && type === WS_EVENTS_TYPE.STOP) {
- return this.schedulerRegistry.deleteCronJob(clientId, data);
- }
- // switch case for different events
- switch (name) {
- // collection loading, indexing, update
- case WS_EVENTS.COLLECTION_UPDATE:
- if (type === WS_EVENTS_TYPE.START && !cronJob) {
- return this.execCollectionUpdateTask(clientId, data);
- }
- break;
- default:
- throw new Error('Unsupported event type');
- }
- }
- async execCollectionUpdateTask(clientId: string, data: CronJobObject) {
- console.log('execCollectionUpdateTask', clientId, data);
- const task = async () => {
- const currentJob: CronJob = this.schedulerRegistry.getCronJob(
- clientId,
- data
- );
- // if currentJob is not exist
- if (!currentJob) {
- // if client not connected, stop cron
- this.schedulerRegistry.deleteCronJob(clientId, data);
- return;
- }
- if (!clientCache.has(clientId)) {
- // if client not connected, stop cron
- this.schedulerRegistry.deleteCronJob(clientId, data);
- console.info('Client is not connected, stop cron.', clientId);
- return;
- }
- try {
- // get client cache data
- const { milvusClient } = clientCache.get(clientId);
- const currentDatabase = (milvusClient as any).metadata.get('dbname');
- // if database is not matched, return
- if (currentDatabase !== data.payload.database) {
- // if client not connected, stop cron
- this.schedulerRegistry.deleteCronJob(clientId, data);
- console.info('Database is not matched, stop cron.', clientId);
- return;
- }
- const collections = await this.collectionService.getAllCollections(
- currentJob.clientId,
- currentJob.data.payload.collections,
- currentJob.data.payload.database
- );
- // get current socket
- const socketClient = clients.get(currentJob.clientId);
- if (socketClient) {
- // emit event to current client, loading and indexing events are indetified as collection update
- socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, {
- collections,
- database: currentJob.data.payload.database,
- });
- // if all collections are loaded, stop cron
- const LoadingOrBuildingCollections = collections.filter(v => {
- const isLoading = checkLoading(v);
- const isBuildingIndex = checkIndexing(v);
- return isLoading || isBuildingIndex;
- });
- if (LoadingOrBuildingCollections.length === 0) {
- this.schedulerRegistry.deleteCronJob(clientId, data);
- }
- }
- } catch (error) {
- if (error.message.includes('pool is draining')) {
- // Handle the pool draining error, possibly by logging and avoiding retry
- console.error(
- 'The pool is shutting down and cannot accept new work.'
- );
- this.schedulerRegistry.deleteCronJob(clientId, data);
- return;
- }
- // When user not connect milvus, stop cron
- this.schedulerRegistry.deleteCronJob(clientId, data);
- throw new Error(error);
- }
- };
- // every 5 seconds
- this.schedulerRegistry.setCronJob(clientId, '*/5 * * * * *', task, data);
- }
- }
- export class SchedulerRegistry {
- constructor(private cronJobMap: Map<string, CronJob>) {}
- getCronJob(clientId: string, data: CronJobObject) {
- const targetId = getId(clientId, data);
- const target = this.cronJobMap.get(targetId);
- return target;
- }
- deleteCronJob(clientId: string, data: CronJobObject) {
- const targetId = getId(clientId, data);
- if (this.cronJobMap.has(targetId)) {
- this.cronJobMap.get(targetId)?.task?.stop();
- this.cronJobMap.delete(targetId);
- }
- }
- deleteAllCronJobs(clientId: string) {
- // console.log('Deleting all cron jobs in service for client:', clientId);
- this.cronJobMap.forEach((v, k) => {
- if (v.clientId === clientId) {
- v.task.stop();
- this.cronJobMap.delete(k);
- }
- });
- }
- // ┌────────────── second (optional)
- // │ ┌──────────── minute
- // │ │ ┌────────── hour
- // │ │ │ ┌──────── day of month
- // │ │ │ │ ┌────── month
- // │ │ │ │ │ ┌──── day of week
- // │ │ │ │ │ │
- // │ │ │ │ │ │
- // * * * * * *
- // https://www.npmjs.com/package/node-cron
- setCronJob(
- clientId: string,
- cronExpression: string,
- func: () => void,
- data: CronJobObject
- ) {
- const target = this.getCronJob(clientId, data);
- if (target) {
- target?.task?.stop();
- } else {
- // create task id
- const id = getId(clientId, data);
- if (!this.cronJobMap.has(id)) {
- console.log('create task:', id);
- // create task
- const task = schedule(cronExpression, () => {
- console.log(
- `[cronExpression:${cronExpression}] ${data.name} ${id}: running a task.`
- );
- func();
- });
- // save task
- this.cronJobMap.set(id, {
- id,
- clientId,
- task,
- data,
- });
- }
- }
- }
- }
|