Add WorkerNotification queue and update SendReportsToSubscribers job configuration

This commit is contained in:
Simon Larsen 2024-09-22 14:15:09 +01:00
parent a0031a868a
commit 9be007cbda
No known key found for this signature in database
GPG key ID: 96C5DCA24769DBCA
4 changed files with 33 additions and 3 deletions

View file

@ -15,6 +15,7 @@ import { ExpressRouter } from "../Utils/Express";
export enum QueueName {
Workflow = "Workflow",
Worker = "Worker",
WorkerNotification = "WorkerNotification",
}
export type QueueJob = Job;

View file

@ -7,10 +7,15 @@ import StatusPageService from "Common/Server/Services/StatusPageService";
import QueryHelper from "Common/Server/Types/Database/QueryHelper";
import logger from "Common/Server/Utils/Logger";
import StatusPage from "Common/Models/DatabaseModels/StatusPage";
import { QueueName } from "Common/Server/Infrastructure/Queue";
RunCron(
"StatusPage:SendReportToSubscribers",
{ schedule: EVERY_MINUTE, runOnStartup: false },
{
schedule: EVERY_MINUTE,
runOnStartup: false,
queueName: QueueName.WorkerNotification,
},
async () => {
// get all scheduled events of all the projects.
const statusPageToSendReports: Array<StatusPage> =

View file

@ -132,6 +132,26 @@ const WorkersFeatureSet: FeatureSet = {
},
{ concurrency: 100 },
);
// Job process.
QueueWorker.getWorker(
QueueName.WorkerNotification,
async (job: QueueJob) => {
const name: string = job.name;
logger.debug("Running Job: " + name);
const funcToRun: PromiseVoidFunction =
JobDictionary.getJobFunction(name);
const timeoutInMs: number = JobDictionary.getTimeoutInMs(name);
if (funcToRun) {
await QueueWorker.runJobWithTimeout(timeoutInMs, funcToRun);
}
},
{ concurrency: 100 },
);
} catch (err) {
logger.error("App Init Failed:");
logger.error(err);

View file

@ -15,6 +15,7 @@ type RunCronFunction = (
schedule: string;
runOnStartup: boolean;
timeoutInMS?: number | undefined;
queueName?: QueueName | undefined;
},
runFunction: PromiseVoidFunction,
) => void;
@ -26,6 +27,7 @@ const RunCron: RunCronFunction = (
schedule: string;
runOnStartup: boolean;
timeoutInMS?: number | undefined;
queueName?: QueueName | undefined;
},
runFunction: PromiseVoidFunction,
): void => {
@ -51,9 +53,11 @@ const RunCron: RunCronFunction = (
logger.debug("Adding job to the queue: " + jobName);
const queueName: QueueName = options.queueName || QueueName.Worker;
// Add the job to the queue with the specified schedule
Queue.addJob(
QueueName.Worker,
queueName,
jobName,
jobName,
{},
@ -66,7 +70,7 @@ const RunCron: RunCronFunction = (
// Run the job immediately on startup if specified
if (options.runOnStartup) {
Queue.addJob(QueueName.Worker, jobName, jobName, {}, {}).catch(
Queue.addJob(queueName, jobName, jobName, {}, {}).catch(
(err: Error) => {
return logger.error(err);
},