feat: Implement KEDA autoscaling for worker service with metrics endpoint

This commit is contained in:
Nawaz Dhandala 2025-08-12 17:37:13 +01:00
parent 51d42c8436
commit e295c19b19
No known key found for this signature in database
GPG key ID: 96C5DCA24769DBCA
6 changed files with 65 additions and 2 deletions

View file

@ -49,4 +49,11 @@ KEDA ScaledObjects for various services
{{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeKedaArgs }}
{{- end }}
{{- end }}
{{/* Worker KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.worker.keda.enabled (not .Values.worker.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.worker.keda.enabled "minReplicas" .Values.worker.keda.minReplicas "maxReplicas" .Values.worker.keda.maxReplicas "pollingInterval" .Values.worker.keda.pollingInterval "cooldownPeriod" .Values.worker.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_worker_queue_size" "threshold" .Values.worker.keda.queueSizeThreshold "port" .Values.worker.ports.http)) }}
{{- $workerKedaArgs := dict "ServiceName" "worker" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.worker.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $workerKedaArgs }}
{{- end }}

View file

@ -109,8 +109,7 @@ spec:
---
# OneUptime app autoscaler
{{- if not $.Values.worker.disableAutoscaler }}
{{- if and (not $.Values.worker.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.worker.keda.enabled)) }}
{{- $workerAutoScalerArgs := dict "ServiceName" "worker" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $workerAutoScalerArgs }}
{{- end }}
---

View file

@ -481,6 +481,17 @@ worker:
ports:
http: 1445
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
workflow:
replicaCount: 1

35
Worker/API/Metrics.ts Normal file
View file

@ -0,0 +1,35 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import WorkerQueueService from "../Services/Queue/WorkerQueueService";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await WorkerQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View file

@ -113,6 +113,7 @@ import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import FeatureSet from "Common/Server/Types/FeatureSet";
import logger from "Common/Server/Utils/Logger";
import { WORKER_CONCURRENCY } from "./Config";
import MetricsAPI from "./API/Metrics";
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
@ -124,6 +125,9 @@ const WorkersFeatureSet: FeatureSet = {
// attach bull board to the app
app.use(Queue.getInspectorRoute(), Queue.getQueueInspectorRouter());
// expose metrics endpoint used by KEDA
app.use(["/worker", "/"], MetricsAPI);
// run async database migrations
RunDatabaseMigrations().catch((err: Error) => {
logger.error("Error running database migrations");

View file

@ -0,0 +1,7 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
export default class WorkerQueueService {
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.Worker);
}
}