diff --git a/apps/dokploy/server/server.ts b/apps/dokploy/server/server.ts index 415c24be1..6d82012f5 100644 --- a/apps/dokploy/server/server.ts +++ b/apps/dokploy/server/server.ts @@ -34,7 +34,6 @@ void app.prepare().then(async () => { }); // WEBSOCKET - setupMonitoringWebSocketServer(server); setupDeploymentLogsWebSocketServer(server); setupDockerContainerLogsWebSocketServer(server); setupDockerContainerTerminalWebSocketServer(server); diff --git a/apps/dokploy/server/wss/monitoring.ts b/apps/dokploy/server/wss/monitoring.ts deleted file mode 100644 index 1ee70d5f7..000000000 --- a/apps/dokploy/server/wss/monitoring.ts +++ /dev/null @@ -1,122 +0,0 @@ -import type http from "node:http"; -import { WebSocket, WebSocketServer } from "ws"; - -interface MonitoringState { - activeClients: Set; - agent: WebSocket | null; - isCollecting: boolean; -} - -const state: MonitoringState = { - activeClients: new Set(), - agent: null, - isCollecting: false, -}; - -export const setupMonitoringWebSocketServer = ( - server: http.Server, -) => { - const wss = new WebSocketServer({ - noServer: true, - path: "/agent", - }); - - const startMetricsCollection = () => { - if (!state.agent || state.isCollecting) return; - state.isCollecting = true; - console.log(" Iniciando recolección de métricas"); - state.agent.send(JSON.stringify({ type: "start" })); - }; - - const stopMetricsCollection = () => { - if (!state.agent || !state.isCollecting) return; - state.isCollecting = false; - console.log(" Deteniendo recolección de métricas"); - state.agent.send(JSON.stringify({ type: "stop" })); - }; - - server.on("upgrade", (req, socket, head) => { - const { pathname } = new URL(req.url || "", `http://${req.headers.host}`); - console.log(" Solicitud de upgrade para:", pathname); - - if (pathname === "/_next/webpack-hmr") return; - if (pathname === "/agent") { - wss.handleUpgrade(req, socket, head, (ws) => { - wss.emit("connection", ws, req); - }); - } - }); - - wss.on("connection", (ws: WebSocket, req) => { - console.log(" Nueva conexión desde:", req.socket.remoteAddress); - - ws.on("message", (data) => { - try { - const message = JSON.parse(data.toString()); - console.log(" Mensaje recibido:", message); - - switch (message.type) { - case "register": - if (message.data?.role === "agent") { - console.log(" Agente registrado"); - state.agent = ws; - // Si hay clientes esperando, iniciar métricas - if (state.activeClients.size > 0) { - startMetricsCollection(); - } - } else if (message.data?.role === "client") { - console.log(" Cliente conectado"); - state.activeClients.add(ws); - // Si es el primer cliente y tenemos un agente, iniciar métricas - if (state.activeClients.size === 1 && state.agent) { - startMetricsCollection(); - } - } - break; - - case "metrics": - console.log( - " Métricas recibidas, reenviando a", - state.activeClients.size, - "clientes", - ); - // Reenviar métricas a todos los clientes activos - state.activeClients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(data.toString()); - } - }); - break; - } - } catch (error) { - console.error(" Error procesando mensaje:", error); - } - }); - - ws.on("close", () => { - // Si era un cliente - if (state.activeClients.has(ws)) { - state.activeClients.delete(ws); - console.log( - " Cliente desconectado. Clientes restantes:", - state.activeClients.size, - ); - - // Si no quedan clientes, detener métricas - if (state.activeClients.size === 0) { - stopMetricsCollection(); - } - } - // Si era el agente - else if (ws === state.agent) { - state.agent = null; - state.isCollecting = false; - console.log(" Agente desconectado"); - } - }); - - ws.on("error", (error) => { - console.error(" Error en la conexión:", error); - }); - }); -}; diff --git a/apps/monitoring/src/containers/index.ts b/apps/monitoring/src/containers/index.ts index b00a5ab3c..3392584c2 100644 --- a/apps/monitoring/src/containers/index.ts +++ b/apps/monitoring/src/containers/index.ts @@ -19,6 +19,7 @@ const REFRESH_RATE_CONTAINER = Number( process.env.CONTAINER_REFRESH_RATE || 10000, ); +const logStreams = new Map(); export const logContainerMetrics = () => { console.log("Refresh rate:", REFRESH_RATE_CONTAINER); @@ -28,6 +29,11 @@ export const logContainerMetrics = () => { const cleanup = async () => { if (job) { job.cancel(); + + for (const stream of logStreams.values()) { + stream.end(); + } + // logStreams.forEach((stream) => stream.end()); } }; @@ -96,9 +102,23 @@ export const logContainerMetrics = () => { } } - // Escribir la nueva línea - await fs.promises.appendFile(containerPath, logLine); + if (!logStreams.has(serviceName)) { + console.log(logStreams.size); + logStreams.set( + serviceName, + fs.createWriteStream(containerPath, { flags: "a" }), + ); + } + const stream = logStreams.get(serviceName); + if (stream) { + if (!stream.write(logLine)) { + stream.once("drain", () => stream.write(logLine)); + } + } + + // Escribir la nueva línea + // await fs.promises.appendFile(containerPath, logLine); } catch (error) { console.error( `Error writing metrics for container ${container.Name}:`, @@ -121,7 +141,6 @@ export const logContainerMetrics = () => { Math.floor(REFRESH_RATE_CONTAINER / 1000), ); - console.log(rule); job = schedule.scheduleJob(rule, runMetricsCollection); process.on("SIGTERM", cleanup); diff --git a/apps/monitoring/src/server/server.ts b/apps/monitoring/src/server/server.ts index 1af477ee6..80f3c3130 100644 --- a/apps/monitoring/src/server/server.ts +++ b/apps/monitoring/src/server/server.ts @@ -49,7 +49,10 @@ const getServerMetrics = async () => { }; const REFRESH_RATE_SERVER = Number(process.env.REFRESH_RATE_SERVER || 10000); -const MAX_FILE_SIZE_MB = Number(process.env.MAX_FILE_SIZE_MB || 10); // 10 MB por defecto +const MAX_FILE_SIZE_MB = Number(process.env.MAX_FILE_SIZE_MB || 10); + +// Crear el WriteStream (mantener abierto para reutilizarlo) +const logStream = fs.createWriteStream(serverLogFile, { flags: "a" }); export const logServerMetrics = () => { const rule = new schedule.RecurrenceRule(); @@ -87,9 +90,12 @@ export const logServerMetrics = () => { } } - fs.appendFile(serverLogFile, logLine, (err) => { - if (err) console.error("Error to write server metrics:", err); - }); + if (!logStream.write(logLine)) { + console.log("Escribiendo...."); + logStream.once("drain", () => { + logStream.write(logLine); + }); + } }); // Cleanup function