diff --git a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml index c40f82a880..eff588f2f1 100644 --- a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml +++ b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml @@ -49,4 +49,11 @@ KEDA ScaledObjects for various services {{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }} {{- include "oneuptime.kedaScaledObject" $probeKedaArgs }} {{- end }} +{{- end }} + +{{/* Worker KEDA ScaledObject */}} +{{- if and .Values.keda.enabled .Values.worker.keda.enabled (not .Values.worker.disableAutoscaler) }} +{{- $metricsConfig := dict "enabled" .Values.worker.keda.enabled "minReplicas" .Values.worker.keda.minReplicas "maxReplicas" .Values.worker.keda.maxReplicas "pollingInterval" .Values.worker.keda.pollingInterval "cooldownPeriod" .Values.worker.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_worker_queue_size" "threshold" .Values.worker.keda.queueSizeThreshold "port" .Values.worker.ports.http)) }} +{{- $workerKedaArgs := dict "ServiceName" "worker" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.worker.disableAutoscaler }} +{{- include "oneuptime.kedaScaledObject" $workerKedaArgs }} {{- end }} \ No newline at end of file diff --git a/HelmChart/Public/oneuptime/templates/worker.yaml b/HelmChart/Public/oneuptime/templates/worker.yaml index f4ca4032cc..670a0dcfee 100644 --- a/HelmChart/Public/oneuptime/templates/worker.yaml +++ b/HelmChart/Public/oneuptime/templates/worker.yaml @@ -109,8 +109,7 @@ spec: --- # OneUptime app autoscaler -{{- if not $.Values.worker.disableAutoscaler }} +{{- if and (not $.Values.worker.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.worker.keda.enabled)) }} {{- $workerAutoScalerArgs := dict "ServiceName" "worker" "Release" $.Release "Values" $.Values -}} {{- include "oneuptime.autoscaler" $workerAutoScalerArgs }} {{- end }} ---- diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index 930ad0131d..0b5240e8d7 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -484,6 +484,17 @@ worker: ports: http: 1445 resources: + # KEDA autoscaling configuration based on queue metrics + keda: + enabled: false + minReplicas: 1 + maxReplicas: 100 + # Scale up when queue size exceeds this threshold + queueSizeThreshold: 100 + # Polling interval for metrics (in seconds) + pollingInterval: 30 + # Cooldown period after scaling (in seconds) + cooldownPeriod: 300 workflow: replicaCount: 1 diff --git a/Worker/API/Metrics.ts b/Worker/API/Metrics.ts new file mode 100644 index 0000000000..0c8e1e77e5 --- /dev/null +++ b/Worker/API/Metrics.ts @@ -0,0 +1,35 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import WorkerQueueService from "../Services/Queue/WorkerQueueService"; + +const router: ExpressRouter = Express.getRouter(); + +/** + * JSON metrics endpoint for KEDA autoscaling + * Returns queue size as JSON for KEDA metrics-api scaler + */ +router.get( + "/metrics/queue-size", + async ( + _req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const queueSize: number = await WorkerQueueService.getQueueSize(); + + res.setHeader("Content-Type", "application/json"); + res.status(200).json({ + queueSize: queueSize, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/Worker/Routes.ts b/Worker/Routes.ts index 5ab521dd59..f5c1d84fa0 100644 --- a/Worker/Routes.ts +++ b/Worker/Routes.ts @@ -113,6 +113,7 @@ import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; import FeatureSet from "Common/Server/Types/FeatureSet"; import logger from "Common/Server/Utils/Logger"; import { WORKER_CONCURRENCY } from "./Config"; +import MetricsAPI from "./API/Metrics"; import Express, { ExpressApplication } from "Common/Server/Utils/Express"; @@ -124,6 +125,9 @@ const WorkersFeatureSet: FeatureSet = { // attach bull board to the app app.use(Queue.getInspectorRoute(), Queue.getQueueInspectorRouter()); + // expose metrics endpoint used by KEDA + app.use(["/worker", "/"], MetricsAPI); + // run async database migrations RunDatabaseMigrations().catch((err: Error) => { logger.error("Error running database migrations"); diff --git a/Worker/Services/Queue/WorkerQueueService.ts b/Worker/Services/Queue/WorkerQueueService.ts new file mode 100644 index 0000000000..68278c4b9a --- /dev/null +++ b/Worker/Services/Queue/WorkerQueueService.ts @@ -0,0 +1,7 @@ +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; + +export default class WorkerQueueService { + public static async getQueueSize(): Promise { + return Queue.getQueueSize(QueueName.Worker); + } +}