From 9be007cbdab2132bed7d80c577199dd84a7f2156 Mon Sep 17 00:00:00 2001 From: Simon Larsen Date: Sun, 22 Sep 2024 14:15:09 +0100 Subject: [PATCH] Add WorkerNotification queue and update SendReportsToSubscribers job configuration --- Common/Server/Infrastructure/Queue.ts | 1 + .../StatusPage/SendReportsToSubscribers.ts | 7 ++++++- Worker/Routes.ts | 20 +++++++++++++++++++ Worker/Utils/Cron.ts | 8 ++++++-- 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/Common/Server/Infrastructure/Queue.ts b/Common/Server/Infrastructure/Queue.ts index 70d1994c9b..aa4893befe 100644 --- a/Common/Server/Infrastructure/Queue.ts +++ b/Common/Server/Infrastructure/Queue.ts @@ -15,6 +15,7 @@ import { ExpressRouter } from "../Utils/Express"; export enum QueueName { Workflow = "Workflow", Worker = "Worker", + WorkerNotification = "WorkerNotification", } export type QueueJob = Job; diff --git a/Worker/Jobs/StatusPage/SendReportsToSubscribers.ts b/Worker/Jobs/StatusPage/SendReportsToSubscribers.ts index e913aebe3c..7785162024 100644 --- a/Worker/Jobs/StatusPage/SendReportsToSubscribers.ts +++ b/Worker/Jobs/StatusPage/SendReportsToSubscribers.ts @@ -7,10 +7,15 @@ import StatusPageService from "Common/Server/Services/StatusPageService"; import QueryHelper from "Common/Server/Types/Database/QueryHelper"; import logger from "Common/Server/Utils/Logger"; import StatusPage from "Common/Models/DatabaseModels/StatusPage"; +import { QueueName } from "Common/Server/Infrastructure/Queue"; RunCron( "StatusPage:SendReportToSubscribers", - { schedule: EVERY_MINUTE, runOnStartup: false }, + { + schedule: EVERY_MINUTE, + runOnStartup: false, + queueName: QueueName.WorkerNotification, + }, async () => { // get all scheduled events of all the projects. const statusPageToSendReports: Array = diff --git a/Worker/Routes.ts b/Worker/Routes.ts index a494bd1dbb..c2b5991f71 100644 --- a/Worker/Routes.ts +++ b/Worker/Routes.ts @@ -132,6 +132,26 @@ const WorkersFeatureSet: FeatureSet = { }, { concurrency: 100 }, ); + + // Job process. + QueueWorker.getWorker( + QueueName.WorkerNotification, + async (job: QueueJob) => { + const name: string = job.name; + + logger.debug("Running Job: " + name); + + const funcToRun: PromiseVoidFunction = + JobDictionary.getJobFunction(name); + + const timeoutInMs: number = JobDictionary.getTimeoutInMs(name); + + if (funcToRun) { + await QueueWorker.runJobWithTimeout(timeoutInMs, funcToRun); + } + }, + { concurrency: 100 }, + ); } catch (err) { logger.error("App Init Failed:"); logger.error(err); diff --git a/Worker/Utils/Cron.ts b/Worker/Utils/Cron.ts index 7d1f3c55a9..9d54e398c8 100644 --- a/Worker/Utils/Cron.ts +++ b/Worker/Utils/Cron.ts @@ -15,6 +15,7 @@ type RunCronFunction = ( schedule: string; runOnStartup: boolean; timeoutInMS?: number | undefined; + queueName?: QueueName | undefined; }, runFunction: PromiseVoidFunction, ) => void; @@ -26,6 +27,7 @@ const RunCron: RunCronFunction = ( schedule: string; runOnStartup: boolean; timeoutInMS?: number | undefined; + queueName?: QueueName | undefined; }, runFunction: PromiseVoidFunction, ): void => { @@ -51,9 +53,11 @@ const RunCron: RunCronFunction = ( logger.debug("Adding job to the queue: " + jobName); + const queueName: QueueName = options.queueName || QueueName.Worker; + // Add the job to the queue with the specified schedule Queue.addJob( - QueueName.Worker, + queueName, jobName, jobName, {}, @@ -66,7 +70,7 @@ const RunCron: RunCronFunction = ( // Run the job immediately on startup if specified if (options.runOnStartup) { - Queue.addJob(QueueName.Worker, jobName, jobName, {}, {}).catch( + Queue.addJob(queueName, jobName, jobName, {}, {}).catch( (err: Error) => { return logger.error(err); },