From e295c19b198d06369789109a576df8536394efca Mon Sep 17 00:00:00 2001 From: Nawaz Dhandala Date: Tue, 12 Aug 2025 17:37:13 +0100 Subject: [PATCH 1/2] feat: Implement KEDA autoscaling for worker service with metrics endpoint --- .../templates/keda-scaledobjects.yaml | 7 ++++ .../Public/oneuptime/templates/worker.yaml | 3 +- HelmChart/Public/oneuptime/values.yaml | 11 ++++++ Worker/API/Metrics.ts | 35 +++++++++++++++++++ Worker/Routes.ts | 4 +++ Worker/Services/Queue/WorkerQueueService.ts | 7 ++++ 6 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 Worker/API/Metrics.ts create mode 100644 Worker/Services/Queue/WorkerQueueService.ts diff --git a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml index c40f82a880..eff588f2f1 100644 --- a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml +++ b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml @@ -49,4 +49,11 @@ KEDA ScaledObjects for various services {{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }} {{- include "oneuptime.kedaScaledObject" $probeKedaArgs }} {{- end }} +{{- end }} + +{{/* Worker KEDA ScaledObject */}} +{{- if and .Values.keda.enabled .Values.worker.keda.enabled (not .Values.worker.disableAutoscaler) }} +{{- $metricsConfig := dict "enabled" .Values.worker.keda.enabled "minReplicas" .Values.worker.keda.minReplicas "maxReplicas" .Values.worker.keda.maxReplicas "pollingInterval" .Values.worker.keda.pollingInterval "cooldownPeriod" .Values.worker.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_worker_queue_size" "threshold" .Values.worker.keda.queueSizeThreshold "port" .Values.worker.ports.http)) }} +{{- $workerKedaArgs := dict "ServiceName" "worker" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.worker.disableAutoscaler }} +{{- include "oneuptime.kedaScaledObject" $workerKedaArgs }} {{- end }} \ No newline at end of file diff --git a/HelmChart/Public/oneuptime/templates/worker.yaml b/HelmChart/Public/oneuptime/templates/worker.yaml index f4ca4032cc..670a0dcfee 100644 --- a/HelmChart/Public/oneuptime/templates/worker.yaml +++ b/HelmChart/Public/oneuptime/templates/worker.yaml @@ -109,8 +109,7 @@ spec: --- # OneUptime app autoscaler -{{- if not $.Values.worker.disableAutoscaler }} +{{- if and (not $.Values.worker.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.worker.keda.enabled)) }} {{- $workerAutoScalerArgs := dict "ServiceName" "worker" "Release" $.Release "Values" $.Values -}} {{- include "oneuptime.autoscaler" $workerAutoScalerArgs }} {{- end }} ---- diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index 94588aa3fe..486b21aaef 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -481,6 +481,17 @@ worker: ports: http: 1445 resources: + # KEDA autoscaling configuration based on queue metrics + keda: + enabled: false + minReplicas: 1 + maxReplicas: 100 + # Scale up when queue size exceeds this threshold + queueSizeThreshold: 100 + # Polling interval for metrics (in seconds) + pollingInterval: 30 + # Cooldown period after scaling (in seconds) + cooldownPeriod: 300 workflow: replicaCount: 1 diff --git a/Worker/API/Metrics.ts b/Worker/API/Metrics.ts new file mode 100644 index 0000000000..0c8e1e77e5 --- /dev/null +++ b/Worker/API/Metrics.ts @@ -0,0 +1,35 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import WorkerQueueService from "../Services/Queue/WorkerQueueService"; + +const router: ExpressRouter = Express.getRouter(); + +/** + * JSON metrics endpoint for KEDA autoscaling + * Returns queue size as JSON for KEDA metrics-api scaler + */ +router.get( + "/metrics/queue-size", + async ( + _req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const queueSize: number = await WorkerQueueService.getQueueSize(); + + res.setHeader("Content-Type", "application/json"); + res.status(200).json({ + queueSize: queueSize, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/Worker/Routes.ts b/Worker/Routes.ts index 5ab521dd59..ddd7e4f4ef 100644 --- a/Worker/Routes.ts +++ b/Worker/Routes.ts @@ -113,6 +113,7 @@ import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; import FeatureSet from "Common/Server/Types/FeatureSet"; import logger from "Common/Server/Utils/Logger"; import { WORKER_CONCURRENCY } from "./Config"; +import MetricsAPI from "./API/Metrics"; import Express, { ExpressApplication } from "Common/Server/Utils/Express"; @@ -124,6 +125,9 @@ const WorkersFeatureSet: FeatureSet = { // attach bull board to the app app.use(Queue.getInspectorRoute(), Queue.getQueueInspectorRouter()); + // expose metrics endpoint used by KEDA + app.use(["/worker", "/"], MetricsAPI); + // run async database migrations RunDatabaseMigrations().catch((err: Error) => { logger.error("Error running database migrations"); diff --git a/Worker/Services/Queue/WorkerQueueService.ts b/Worker/Services/Queue/WorkerQueueService.ts new file mode 100644 index 0000000000..68278c4b9a --- /dev/null +++ b/Worker/Services/Queue/WorkerQueueService.ts @@ -0,0 +1,7 @@ +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; + +export default class WorkerQueueService { + public static async getQueueSize(): Promise { + return Queue.getQueueSize(QueueName.Worker); + } +} From 33992984e2e0830b7b0acbeaaef6b4c9d839c86e Mon Sep 17 00:00:00 2001 From: Nawaz Dhandala Date: Tue, 12 Aug 2025 17:38:38 +0100 Subject: [PATCH 2/2] fix: Correct indentation for metrics endpoint exposure in WorkersFeatureSet --- Worker/Routes.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Worker/Routes.ts b/Worker/Routes.ts index ddd7e4f4ef..f5c1d84fa0 100644 --- a/Worker/Routes.ts +++ b/Worker/Routes.ts @@ -125,8 +125,8 @@ const WorkersFeatureSet: FeatureSet = { // attach bull board to the app app.use(Queue.getInspectorRoute(), Queue.getQueueInspectorRouter()); - // expose metrics endpoint used by KEDA - app.use(["/worker", "/"], MetricsAPI); + // expose metrics endpoint used by KEDA + app.use(["/worker", "/"], MetricsAPI); // run async database migrations RunDatabaseMigrations().catch((err: Error) => {