oneuptime/Telemetry/Services/FluentLogsIngestService.ts
Nawaz Dhandala f49b1995df
feat(telemetry): add new Telemetry service (OTel, Syslog, Fluent, Metrics, Traces) and unified ingestion pipeline
- Add Telemetry service entrypoint
  - Telemetry/Index.ts: app bootstrap, routes mounting, infrastructure init and Telemetry SDK init.

- Unified queue + worker
  - Telemetry/Jobs/TelemetryIngest/ProcessTelemetry.ts: single worker that dispatches queued jobs to specific processors (logs, traces, metrics, syslog, fluent logs).
  - Telemetry/Services/Queue/TelemetryQueueService.ts: central queue API and job payload types.
  - Per-type Queue wrappers (LogsQueueService, MetricsQueueService, TracesQueueService, FluentLogsQueueService, SyslogQueueService).

- OpenTelemetry ingestion middleware and proto support
  - Telemetry/Middleware/OtelRequestMiddleware.ts: detect OTLP endpoint (logs/traces/metrics), decode protobuf bodies using protobufjs and set product type.
  - Telemetry/ProtoFiles/OTel/v1/*.proto: include common.proto, logs.proto, metrics.proto, resource.proto, traces.proto for OTLP v1 messages.

- Ingest services
  - Telemetry/Services/OtelLogsIngestService.ts: parse incoming OTLP logs, map attributes, convert timestamps, batch insert logs.
  - Telemetry/Services/OtelTracesIngestService.ts: parse OTLP traces, build span rows, extract exceptions, batch insert spans and exceptions, save telemetry exception summary.
  - Telemetry/Services/OtelMetricsIngestService.ts: parse OTLP metrics, normalize datapoints, batch insert metrics and index metric name -> service map.
  - Telemetry/Services/SyslogIngestService.ts: syslog ingestion endpoints, parser integration, map syslog fields to attributes and logs.
  - Telemetry/Services/FluentLogsIngestService.ts: ingest Fluentd style logs, normalize entries and insert into log backend.
  - Telemetry/Services/OtelIngestBaseService.ts: helpers to resolve service name from attributes/headers.

- Syslog parser and utilities
  - Telemetry/Utils/SyslogParser.ts: robust RFC5424 and RFC3164 parser, structured data extraction and sanitization.
  - Telemetry/Tests/Utils/SyslogParser.test.ts: unit tests for parser behavior.

- Telemetry exception utilities
  - Telemetry/Utils/Exception.ts: generate exception fingerprint and upsert telemetry exception status (saveOrUpdateTelemetryException).

- Queue & job integration
  - New integration with Common/Server/Infrastructure/Queue and QueueWorker, job id generation and telemetry job types.
  - Telemetry services add ingestion jobs instead of processing synchronously.

- Config, build and dev tooling
  - Add Telemetry/package.json, package-lock.json, tsconfig.json, nodemon.json, jest config.
  - New script configs and dependencies (protobufjs, ts-node, jest, nodemon, etc).

- Docker / environment updates
  - docker-compose.base.yml, docker-compose.dev.yml, docker-compose.yml: rename service from open-telemetry-ingest -> telemetry and wire TELEMETRY_* envs.
  - config.example.env: rename and consolidate environment variables (OPEN_TELEMETRY_* -> TELEMETRY_*, update hostnames and ports).
  - Tests/Scripts/status-check.sh: update ready-check target to telemetry/status/ready.

- Other
  - Telemetry/Services/Queue/*: export helpers and legacy-compatible job interface shims.
  - Memory cleanup and batching safeguards across ingest services.
  - Logging and capture spans added to key code paths.

BREAKING CHANGES / MIGRATION NOTES:
- Environment variables and docker service names changed:
  - Replace OPEN_TELEMETRY_... vars with TELEMETRY_... (PORT, HOSTNAME, CONCURRENCY, DISABLE_TELEMETRY, etc).
  - docker-compose entries moved from "open-telemetry-ingest" to "telemetry" and image name changed to oneuptime/telemetry.
  - Update any deployment automation and monitoring checks referencing the old service name or endpoints.
- Consumers: OTLP endpoints and behavior remain supported, but ingestion is now queued and processed asynchronously.

Testing / Running:
- Install deps in Telemetry/ (npm install) after syncing Common workspace.
- Run dev: npx nodemon (nodemon.json) or build & start using provided scripts.
- Run tests with jest (Telemetry test suite includes SyslogParser unit tests).

Files added/modified (high level):
- Added many files under Telemetry/: Index, Jobs, Middleware, ProtoFiles, Services, Utils, Tests, package and config artifacts.
- Modified docker-compose.* and config.example.env and status check script to use new TELEMETRY service/vars.
2025-11-07 21:36:47 +00:00

322 lines
8.7 KiB
TypeScript

import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import BadRequestException from "Common/Types/Exception/BadRequestException";
import {
ExpressRequest,
ExpressResponse,
NextFunction,
} from "Common/Server/Utils/Express";
import Response from "Common/Server/Utils/Response";
import CaptureSpan from "Common/Server/Utils/Telemetry/CaptureSpan";
import ObjectID from "Common/Types/ObjectID";
import OneUptimeDate from "Common/Types/Date";
import LogSeverity from "Common/Types/Log/LogSeverity";
import TelemetryUtil, {
AttributeType,
} from "Common/Server/Utils/Telemetry/Telemetry";
import { JSONObject } from "Common/Types/JSON";
import Dictionary from "Common/Types/Dictionary";
import logger from "Common/Server/Utils/Logger";
import OTelIngestService, {
TelemetryServiceMetadata,
} from "Common/Server/Services/OpenTelemetryIngestService";
import LogService from "Common/Server/Services/LogService";
import OtelIngestBaseService from "./OtelIngestBaseService";
import FluentLogsQueueService from "./Queue/FluentLogsQueueService";
import { TELEMETRY_LOG_FLUSH_BATCH_SIZE } from "../Config";
export default class FluentLogsIngestService extends OtelIngestBaseService {
private static readonly DEFAULT_SERVICE_NAME: string = "Fluentd";
@CaptureSpan()
public static async ingestFluentLogs(
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> {
try {
if (!(req as TelemetryRequest).projectId) {
throw new BadRequestException(
"Invalid request - projectId not found in request.",
);
}
req.body = req.body?.toJSON ? req.body.toJSON() : req.body;
const entries: Array<string> = this.normalizeLogEntries(req.body);
if (entries.length === 0) {
throw new BadRequestException(
"No fluent log entries found in request.",
);
}
req.body = {
entries,
} satisfies JSONObject;
Response.sendEmptySuccessResponse(req, res);
await FluentLogsQueueService.addFluentLogIngestJob(
req as TelemetryRequest,
);
return;
} catch (error) {
return next(error);
}
}
@CaptureSpan()
public static async processFluentLogsFromQueue(
req: ExpressRequest,
): Promise<void> {
await this.processFluentLogsAsync(req);
}
@CaptureSpan()
private static async processFluentLogsAsync(
req: ExpressRequest,
): Promise<void> {
try {
const projectId: ObjectID = (req as TelemetryRequest).projectId;
const entries: Array<string> = this.extractEntriesFromRequest(req.body);
if (entries.length === 0) {
logger.warn("Fluent logs ingest: no entries to process.");
return;
}
const serviceName: string = this.getServiceNameFromHeaders(
req,
this.DEFAULT_SERVICE_NAME,
);
const metadata: {
serviceId: ObjectID;
dataRententionInDays: number;
} = await OTelIngestService.telemetryServiceFromName({
serviceName,
projectId,
});
const serviceMetadata: TelemetryServiceMetadata = {
serviceName,
serviceId: metadata.serviceId,
dataRententionInDays: metadata.dataRententionInDays,
} satisfies TelemetryServiceMetadata;
const baseAttributes: Dictionary<AttributeType | Array<AttributeType>> =
TelemetryUtil.getAttributesForServiceIdAndServiceName({
serviceId: serviceMetadata.serviceId,
serviceName,
});
const baseAttributeKeys: Array<string> =
TelemetryUtil.getAttributeKeys(baseAttributes);
const dbLogs: Array<JSONObject> = [];
let processed: number = 0;
for (const entry of entries) {
try {
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
const ingestionDateTime: string =
OneUptimeDate.toClickhouseDateTime(ingestionDate);
const timeUnixNano: string = Math.trunc(
OneUptimeDate.toUnixNano(ingestionDate),
).toString();
const attributes: Dictionary<AttributeType | Array<AttributeType>> = {
...baseAttributes,
};
const logRow: JSONObject = {
_id: ObjectID.generate().toString(),
createdAt: ingestionDateTime,
updatedAt: ingestionDateTime,
projectId: projectId.toString(),
serviceId: serviceMetadata.serviceId.toString(),
time: ingestionDateTime,
timeUnixNano,
severityNumber: 0,
severityText: LogSeverity.Unspecified,
attributes,
attributeKeys: [...baseAttributeKeys],
traceId: "",
spanId: "",
body: entry,
} satisfies JSONObject;
dbLogs.push(logRow);
processed++;
if (dbLogs.length >= TELEMETRY_LOG_FLUSH_BATCH_SIZE) {
await this.flushLogsBuffer(dbLogs);
}
} catch (processingError) {
logger.error("Fluent logs ingest: error processing entry");
logger.error(processingError);
logger.error(`Fluent log entry: ${entry}`);
}
}
await this.flushLogsBuffer(dbLogs, true);
if (processed === 0) {
logger.warn("Fluent logs ingest: no valid entries processed");
} else {
logger.debug(
`Fluent logs ingest: processed ${processed} entries for project ${projectId.toString()}`,
);
}
dbLogs.length = 0;
try {
if (req.body) {
req.body = null;
}
} catch (cleanupError) {
logger.error("Fluent logs ingest: error during memory cleanup");
logger.error(cleanupError);
}
} catch (error) {
logger.error("Fluent logs ingest: critical error");
logger.error(error);
throw error;
}
}
private static extractEntriesFromRequest(body: unknown): Array<string> {
if (!body || typeof body !== "object") {
return [];
}
const payload: JSONObject = body as JSONObject;
const entries: unknown = payload["entries"];
if (!entries) {
return [];
}
if (Array.isArray(entries)) {
return entries
.map((item: unknown) => {
if (typeof item === "string") {
return item;
}
if (item === null || item === undefined) {
return undefined;
}
if (typeof item === "object") {
try {
return JSON.stringify(item);
} catch {
return undefined;
}
}
return String(item);
})
.filter((item: string | undefined): item is string => {
return Boolean(item && item.length > 0);
});
}
return this.normalizeLogEntries(entries);
}
private static normalizeLogEntries(payload: unknown): Array<string> {
if (payload === undefined || payload === null) {
return [];
}
if (typeof payload === "string") {
const trimmed: string = payload.trim();
if (!trimmed) {
return [];
}
if (trimmed.includes("\n")) {
return trimmed
.split(/\r?\n/)
.map((line: string) => {
return line.trim();
})
.filter((line: string) => {
return line.length > 0;
});
}
return [trimmed];
}
if (Buffer.isBuffer(payload)) {
return this.normalizeLogEntries(payload.toString("utf-8"));
}
if (Array.isArray(payload)) {
const results: Array<string> = [];
for (const item of payload) {
results.push(...this.normalizeLogEntries(item));
}
return results;
}
if (typeof payload === "object") {
const obj: JSONObject = payload as JSONObject;
if (obj["json"] !== undefined) {
return this.normalizeLogEntries(obj["json"]);
}
if (obj["entries"] !== undefined) {
return this.normalizeLogEntries(obj["entries"]);
}
if (obj["message"] !== undefined) {
return this.normalizeLogEntries(obj["message"]);
}
if (obj["log"] !== undefined) {
return this.normalizeLogEntries(obj["log"]);
}
try {
return [JSON.stringify(obj)];
} catch {
return [];
}
}
return [String(payload)];
}
private static async flushLogsBuffer(
logs: Array<JSONObject>,
force: boolean = false,
): Promise<void> {
while (
logs.length >= TELEMETRY_LOG_FLUSH_BATCH_SIZE ||
(force && logs.length > 0)
) {
const batchSize: number = Math.min(
logs.length,
TELEMETRY_LOG_FLUSH_BATCH_SIZE,
);
const batch: Array<JSONObject> = logs.splice(0, batchSize);
if (batch.length === 0) {
continue;
}
await LogService.insertJsonRows(batch);
}
}
}