|
@@ -9,14 +9,18 @@ export class CronsService {
|
|
|
private schedulerRegistry: SchedulerRegistry
|
|
|
) {}
|
|
|
|
|
|
- async toggleCronJobByName(data: { name: string; type: WS_EVENTS_TYPE }) {
|
|
|
- const { name, type } = data;
|
|
|
+ async toggleCronJobByName(data: {
|
|
|
+ name: string;
|
|
|
+ type: WS_EVENTS_TYPE;
|
|
|
+ address: string;
|
|
|
+ }) {
|
|
|
+ const { name, type, address } = data;
|
|
|
|
|
|
switch (name) {
|
|
|
case WS_EVENTS.COLLECTION:
|
|
|
- const cronJobEntity = this.schedulerRegistry.getCronJob(name);
|
|
|
+ const cronJobEntity = this.schedulerRegistry.getCronJob(name, address);
|
|
|
if (!cronJobEntity && Number(type) === WS_EVENTS_TYPE.START) {
|
|
|
- return this.getCollections(WS_EVENTS.COLLECTION);
|
|
|
+ return this.getCollections(WS_EVENTS.COLLECTION, address);
|
|
|
}
|
|
|
if (!cronJobEntity) {
|
|
|
return;
|
|
@@ -29,7 +33,7 @@ export class CronsService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- async getCollections(name: string) {
|
|
|
+ async getCollections(name: string, address: string) {
|
|
|
const task = async () => {
|
|
|
try {
|
|
|
const res = await this.collectionService.getAllCollections();
|
|
@@ -42,7 +46,7 @@ export class CronsService {
|
|
|
return res;
|
|
|
} catch (error) {
|
|
|
// When user not connect milvus, stop cron
|
|
|
- const cronJobEntity = this.schedulerRegistry.getCronJob(name);
|
|
|
+ const cronJobEntity = this.schedulerRegistry.getCronJob(name, address);
|
|
|
if (cronJobEntity) {
|
|
|
cronJobEntity.stop();
|
|
|
}
|
|
@@ -50,21 +54,23 @@ export class CronsService {
|
|
|
throw new Error(error);
|
|
|
}
|
|
|
};
|
|
|
- this.schedulerRegistry.setCronJobEverySecond(name, task);
|
|
|
+ this.schedulerRegistry.setCronJobEverySecond(name, task, address);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
export class SchedulerRegistry {
|
|
|
constructor(private cronJobList: CronJob[]) {}
|
|
|
|
|
|
- getCronJob(name: string) {
|
|
|
- const target = this.cronJobList.find((item) => item.name === name);
|
|
|
+ getCronJob(name: string, address: string) {
|
|
|
+ const target = this.cronJobList.find(
|
|
|
+ item => item.name === name && item.address === address
|
|
|
+ );
|
|
|
return target?.entity;
|
|
|
}
|
|
|
|
|
|
- setCronJobEverySecond(name: string, func: () => {}) {
|
|
|
+ setCronJobEverySecond(name: string, func: () => {}, address: string) {
|
|
|
// The cron job will run every second
|
|
|
- this.setCronJob(name, '* * * * * *', func);
|
|
|
+ this.setCronJob(name, '* * * * * *', func, address);
|
|
|
}
|
|
|
|
|
|
// ┌────────────── second (optional)
|
|
@@ -77,8 +83,10 @@ export class SchedulerRegistry {
|
|
|
// │ │ │ │ │ │
|
|
|
// * * * * * *
|
|
|
// https://www.npmjs.com/package/node-cron
|
|
|
- setCronJob(name: string, scheduler: string, func: () => {}) {
|
|
|
- const target = this.cronJobList.find((item) => item.name === name);
|
|
|
+ setCronJob(name: string, scheduler: string, func: () => {}, address: string) {
|
|
|
+ const target = this.cronJobList.find(
|
|
|
+ item => item.name === name && item.address === address
|
|
|
+ );
|
|
|
if (target) {
|
|
|
target?.entity?.stop();
|
|
|
} else {
|
|
@@ -89,6 +97,7 @@ export class SchedulerRegistry {
|
|
|
this.cronJobList.push({
|
|
|
name,
|
|
|
entity: task,
|
|
|
+ address,
|
|
|
});
|
|
|
}
|
|
|
}
|
|
@@ -97,4 +106,5 @@ export class SchedulerRegistry {
|
|
|
interface CronJob {
|
|
|
name: string;
|
|
|
entity: ScheduledTask;
|
|
|
+ address: string; // milvus address
|
|
|
}
|