diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt index 146412fc0..18b6345e7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt @@ -50,6 +50,10 @@ abstract class BufferingLogWriter( } private val queue = QueuePublisher(queueLimit, onError = onQueueError) + init { + metrics.queue = queue + } + override fun submit(event: T) { metrics.produced() queue.offer(event) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt index cb850b002..0f07d762d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt @@ -41,24 +41,21 @@ abstract class CurrentLogWriter( } is LogTargetConfig.File -> { + val metrics = createMetrics() val file = Path.of(targetConfig.filename) log.info("Writing $category to $file") requireNotNull(fileOptions) logWriter = FileLogWriter( file, serializer, startSleep = fileOptions.startSleep, flushSleep = fileOptions.flushSleep, - batchLimit = fileOptions.batchLimit + batchLimit = fileOptions.batchLimit, + metrics = metrics ) logWriter.start() } is LogTargetConfig.Socket -> { - val metrics = if (Global.metricsExtended) { - LogMetrics.Enabled(category.name.lowercase(Locale.getDefault())) - } else { - LogMetrics.None() - } - + val metrics = createMetrics() log.info("Sending $category to ${targetConfig.host}:${targetConfig.port}") val encoding = when (targetConfig.encoding) { LogTargetConfig.Encoding.NEW_LINE -> LogEncodingNewLine() @@ -75,6 +72,12 @@ abstract class CurrentLogWriter( } } + private fun createMetrics() = if (Global.metricsExtended) { + LogMetrics.Enabled(category.name.lowercase(Locale.getDefault())) + } else { + LogMetrics.None() + } + data class FileOptions( val startSleep: Duration, val flushSleep: Duration, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt index 271e8d6e5..0a8075416 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt @@ -30,8 +30,9 @@ class FileLogWriter( serializer: (T) -> ByteArray?, private val startSleep: Duration, private val flushSleep: Duration, - private val batchLimit: Int = 5000 -) : LogWriter, BufferingLogWriter(serializer, LogEncodingNewLine(), queueLimit = batchLimit) { + private val batchLimit: Int = 5000, + metrics: LogMetrics = LogMetrics.None(), +) : LogWriter, BufferingLogWriter(serializer, LogEncodingNewLine(), queueLimit = batchLimit, metrics = metrics) { companion object { private val log = LoggerFactory.getLogger(FileLogWriter::class.java) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt index aaee638f1..954ae9abc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt @@ -15,29 +15,42 @@ */ package io.emeraldpay.dshackle.monitoring +import io.emeraldpay.dshackle.commons.QueuePublisher import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.Metrics interface LogMetrics { + var queue: QueuePublisher<*>? + fun produced() fun collected() fun dropped() class Enabled(category: String) : LogMetrics { - private val produced = Counter.builder("monitoring_logs_produce") + override var queue: QueuePublisher<*>? = null + + private val produced = Counter.builder("monitoringLogs_produce") .description("Log events produced by Dshackle") .tag("type", category) .register(Metrics.globalRegistry) - private val collected = Counter.builder("monitoring_logs_collect") + private val collected = Counter.builder("monitoringLogs_collect") .description("Log events successfully sent to a storage") .tag("type", category) .register(Metrics.globalRegistry) - private val dropped = Counter.builder("monitoring_logs_drop") + private val dropped = Counter.builder("monitoringLogs_drop") .description("Log events dropped w/o sending") .tag("type", category) .register(Metrics.globalRegistry) + init { + Gauge.builder("monitoringLogs_queueSize") { queue?.size?.toDouble() ?: 0.0 } + .description("Log events queue size") + .tag("type", category) + .register(Metrics.globalRegistry) + } + override fun produced() { produced.increment() } @@ -53,6 +66,8 @@ interface LogMetrics { class None : LogMetrics { + override var queue: QueuePublisher<*>? = null + override fun produced() { }