diff --git a/src/main/kotlin/io/emeraldpay/dshackle/Global.kt b/src/main/kotlin/io/emeraldpay/dshackle/Global.kt index a3a68f0b1..5704c8805 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/Global.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/Global.kt @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.datatype.jdk8.Jdk8Module import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.KotlinModule import io.emeraldpay.api.Chain import io.emeraldpay.dshackle.monitoring.MonitoringContext import io.emeraldpay.dshackle.upstream.bitcoin.data.EsploraUnspent @@ -94,6 +95,7 @@ class Global { objectMapper.registerModule(module) objectMapper.registerModule(Jdk8Module()) objectMapper.registerModule(JavaTimeModule()) + objectMapper.registerModule(KotlinModule.Builder().build()) objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) objectMapper .setDateFormat(SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS")) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/commons/DataQueue.kt b/src/main/kotlin/io/emeraldpay/dshackle/commons/DataQueue.kt new file mode 100644 index 000000000..2e79f45c0 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/commons/DataQueue.kt @@ -0,0 +1,118 @@ +/** + * Copyright (c) 2023 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.commons + +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * A Queue for data for further processing. + * It is thread-safe, i.e. it can have multiple (fire-and-forget) providers and multiple readers. For the case of + * multiple readers it doesn't share the original queue items, i.e., each reader get own set of original items. + */ +class DataQueue( + /** + * Queue Limit. When the subscribers are slower that providers it starts to drop new items when reached this size + */ + private val queueLimit: Int, + onError: ((Error) -> Unit) = {} +) { + + private val locks = ReentrantLock() + private var queue = ArrayList() + private var closed = false + private val nonFailingOnError: ((Error) -> Unit) = { + try { onError(it) } catch (t: Throwable) {} + } + + enum class Error { + FULL, CLOSED, INTERNAL + } + + /** + * Current queue length + */ + val size: Int + get() = locks.withLock { queue.size } + + /** + * Put a new item into the queue + */ + fun offer(values: List): Boolean { + locks.withLock { + if (queue.size >= queueLimit) { + nonFailingOnError(Error.FULL) + return false + } + if (closed) { + nonFailingOnError(Error.CLOSED) + return false + } + queue.addAll(values) + return true + } + } + + fun offer(value: T): Boolean { + locks.withLock { + if (queue.size >= queueLimit) { + nonFailingOnError(Error.FULL) + return false + } + if (closed) { + nonFailingOnError(Error.CLOSED) + return false + } + queue.add(value) + return true + } + } + + /** + * Close the queue, which also clears the queue. + * Queue doesn't accept new items when closed. + */ + fun close() { + locks.withLock { + closed = true + queue.clear() + } + } + + fun request(limit: Int): List { + return locks.withLock { + if (queue.isEmpty()) { + emptyList() + } else if (queue.size < limit) { + val copy = queue + queue = ArrayList(copy.size) + copy + } else { + val copy = queue + val back = ArrayList(queue.size) + val result = ArrayList(limit) + for (i in 0 until limit) { + result.add(copy[i]) + } + for (i in limit until copy.size) { + back.add(copy[i]) + } + queue = back + result + } + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/commons/QueuePublisher.kt b/src/main/kotlin/io/emeraldpay/dshackle/commons/QueuePublisher.kt deleted file mode 100644 index b56137ce5..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/commons/QueuePublisher.kt +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright (c) 2023 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.commons - -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription -import reactor.core.scheduler.Schedulers -import java.time.Duration -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicLong - -/** - * A Reactor publishing Queue which keeps the data even if there is no subscriber, until the specified queueLimit is reached. - * It is thread-safe, i.e. it can multiple (fire-and-forget) sources and multiple subscribers as well. For the case of - * multiple subscriptions it doesn't share the original queue items, i.e., each subscriber get own set of original items. - */ -class QueuePublisher( - /** - * Queue Limit. When the subscribers are slower that providers it starts to drop new items when reached this size - */ - private val queueLimit: Int, - private val sleepEmpty: Duration = Duration.ofMillis(5), - onError: ((Error) -> Unit) = {} -) : Publisher { - - var scheduler = Schedulers.boundedElastic() - - private val queue = ConcurrentLinkedQueue() - private val closed = AtomicBoolean(false) - private val queueSize = AtomicInteger(0) - private val nonFailingOnError: ((Error) -> Unit) = { - try { onError(it) } catch (t: Throwable) {} - } - - enum class Error { - FULL, CLOSED, INTERNAL - } - - /** - * Current queue length - */ - val size: Int - get() = queueSize.get() - - /** - * Put a new item into the queue - */ - fun offer(value: T): Boolean { - if (queueSize.get() >= queueLimit) { - nonFailingOnError(Error.FULL) - return false - } - if (closed.get()) { - nonFailingOnError(Error.CLOSED) - return false - } - return queue.offer(value).also { - if (it) { - queueSize.incrementAndGet() - } - } - } - - /** - * Close the queue, which also completes all current subscribers and cleans the queue. - * Queue doesn't accept new items when closed. - */ - fun close() { - closed.set(true) - queue.clear() - queueSize.set(0) - } - - override fun subscribe(client: Subscriber) { - - val cancelled = AtomicBoolean(false) - val limit = AtomicLong(0) - - client.onSubscribe(object : Subscription { - override fun request(n: Long) { - limit.set(n) - scheduler.schedule(Producer(cancelled, limit, client, this@QueuePublisher)) - } - - override fun cancel() { - cancelled.set(true) - } - }) - } - - private class Producer( - val cancelled: AtomicBoolean, - val limit: AtomicLong, - val client: Subscriber, - - val parent: QueuePublisher, - ) : Runnable { - override fun run() { - while (!cancelled.get() && limit.get() > 0 && !parent.closed.get()) { - val next = parent.queue.poll() - if (next != null) { - parent.queueSize.decrementAndGet() - limit.decrementAndGet() - try { - client.onNext(next) - } catch (t: Throwable) { - // that's not supposed to happen. but what to do if it happened? closing the subscriber with an error is probably a good idea - client.onError(t) - parent.nonFailingOnError(Error.INTERNAL) - return - } - } else { - parent.scheduler.schedule(this, parent.sleepEmpty.toMillis(), TimeUnit.MILLISECONDS) - return - } - } - if (parent.closed.get()) { - client.onComplete() - } else { - parent.scheduler.schedule(this, parent.sleepEmpty.toMillis(), TimeUnit.MILLISECONDS) - } - } - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt index 18b6345e7..b155aa25d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriter.kt @@ -15,16 +15,14 @@ */ package io.emeraldpay.dshackle.monitoring -import io.emeraldpay.dshackle.commons.QueuePublisher +import io.emeraldpay.dshackle.commons.DataQueue import io.emeraldpay.dshackle.commons.RateLimitedAction import org.slf4j.LoggerFactory -import reactor.core.publisher.Flux import java.nio.ByteBuffer import java.time.Duration -import java.util.Optional abstract class BufferingLogWriter( - private val serializer: (T) -> ByteArray?, + private val serializer: LogSerializer?, val encoding: LogEncoding, val queueLimit: Int = 4096, private val metrics: LogMetrics = LogMetrics.None(), @@ -37,52 +35,51 @@ abstract class BufferingLogWriter( protected var onFull: () -> Unit = {} private val errors = RateLimitedAction(Duration.ofSeconds(1)) - private val onQueueError: ((QueuePublisher.Error) -> Unit) = { err -> + private val onQueueError: ((DataQueue.Error) -> Unit) = { err -> when (err) { - QueuePublisher.Error.FULL -> { + DataQueue.Error.FULL -> { metrics.dropped() errors.execute { log.warn("Queue is full: ${queue.size}") } onFull() } - QueuePublisher.Error.CLOSED -> errors.execute { log.warn("Queue is closed") } - QueuePublisher.Error.INTERNAL -> errors.execute { log.warn("Queue cannot processes a log message") } + DataQueue.Error.CLOSED -> errors.execute { log.warn("Queue is closed") } + DataQueue.Error.INTERNAL -> errors.execute { log.warn("Queue cannot processes a log message") } } } - private val queue = QueuePublisher(queueLimit, onError = onQueueError) + private val queue = DataQueue(queueLimit, onError = onQueueError) init { metrics.queue = queue } override fun submit(event: T) { - metrics.produced() queue.offer(event) + metrics.produced() } override fun submitAll(events: List) { - events.forEach(::submit) + queue.offer(events) + events.forEach { _ -> metrics.produced() } } - fun readFromQueue(): Flux { - return Flux.from(queue) + fun next(limit: Int): List { + return queue.request(limit) } - fun readEncodedFromQueue(): Flux { - return readFromQueue() - .map(::toByteBuffer) - .filter(Optional::isPresent) - .map(Optional::get) + fun returnBack(index: Int, events: List) { + queue.offer(events.drop(index)) + // don't fire the metrics update because it's they were already counted } - fun toByteBuffer(event: T): Optional { + fun encode(event: T): ByteBuffer? { val line = try { - serializer(event) + serializer?.apply(event) } catch (t: Throwable) { errors.execute { log.warn("Failed to serialize event: ${t.message}") } - null - } ?: return Optional.empty() + return null + } ?: return null val encoded = try { encoding.write(line) @@ -92,7 +89,7 @@ abstract class BufferingLogWriter( } null } - return Optional.ofNullable(encoded) + return encoded } fun isEmpty(): Boolean { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt index 0f07d762d..dfb2c1c06 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriter.kt @@ -24,7 +24,7 @@ import java.util.Locale abstract class CurrentLogWriter( private val category: Category, - private val serializer: (T) -> ByteArray?, + private val serializer: LogSerializer?, private val fileOptions: FileOptions? = null ) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt index 0a8075416..8c872531f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriter.kt @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit class FileLogWriter( private val file: Path, - serializer: (T) -> ByteArray?, + serializer: LogSerializer?, private val startSleep: Duration, private val flushSleep: Duration, private val batchLimit: Int = 5000, @@ -71,6 +71,11 @@ class FileLogWriter( } try { + val items = next(batchLimit) + if (items.isEmpty()) { + return true + } + val channel = try { FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE) } catch (t: Throwable) { @@ -81,19 +86,18 @@ class FileLogWriter( } return channel.use { wrt -> + var pos = 0 try { - super.readEncodedFromQueue() - .map { - wrt.write(it) - true + items + .forEach { + pos++ + val encoded = encode(it) ?: return@forEach + wrt.write(encoded) } - .take(size().coerceAtMost(batchLimit).toLong()) - .take(batchTime) - // complete writes in this thread _blocking_ - .blockLast() true } catch (t: Throwable) { errors.execute { log.warn("Failed to write to the log", t) } + returnBack(pos, items) false } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncoding.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncoding.kt index 0174367c3..d5f5e79fd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncoding.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncoding.kt @@ -19,5 +19,5 @@ import java.nio.ByteBuffer interface LogEncoding { - fun write(bytes: ByteArray): ByteBuffer + fun write(bytes: ByteBuffer): ByteBuffer } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLine.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLine.kt index 3b79aea9e..77261ef4d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLine.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLine.kt @@ -24,9 +24,11 @@ class LogEncodingNewLine : LogEncoding { private val NL = "\n".toByteArray().first() - override fun write(bytes: ByteArray): ByteBuffer { - val wrt = ByteBuffer.allocateDirect(bytes.size + 1) - wrt.put(bytes) + override fun write(bytes: ByteBuffer): ByteBuffer { + val size = bytes.limit() + val wrt = ByteBuffer.allocateDirect(size + 1) + wrt.put(0, bytes, 0, size) + wrt.position(size) wrt.put(NL) return wrt.flip() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefix.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefix.kt index ad5c33656..623072bd6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefix.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefix.kt @@ -26,12 +26,16 @@ class LogEncodingPrefix : LogEncoding { companion object { private val log = LoggerFactory.getLogger(LogEncodingPrefix::class.java) + + private const val PREFIX_LENGTH = 4 } - override fun write(bytes: ByteArray): ByteBuffer { - val wrt = ByteBuffer.allocateDirect(bytes.size + 4) - wrt.putInt(bytes.size) - wrt.put(bytes) + override fun write(bytes: ByteBuffer): ByteBuffer { + val size = bytes.limit() + val wrt = ByteBuffer.allocateDirect(size + PREFIX_LENGTH) + wrt.putInt(size) + wrt.put(PREFIX_LENGTH, bytes, 0, size) + wrt.position(size + PREFIX_LENGTH) return wrt.flip() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogJsonSerializer.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogJsonSerializer.kt new file mode 100644 index 000000000..9fbb18216 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogJsonSerializer.kt @@ -0,0 +1,35 @@ +package io.emeraldpay.dshackle.monitoring + +import com.fasterxml.jackson.databind.util.ByteBufferBackedOutputStream +import io.emeraldpay.dshackle.Global +import org.slf4j.LoggerFactory +import java.nio.BufferOverflowException +import java.nio.ByteBuffer + +class LogJsonSerializer( + initialBuffer: Int = 1024 +) : LogSerializer { + + companion object { + private val log = LoggerFactory.getLogger(LogJsonSerializer::class.java) + } + + private val objectMapper = Global.objectMapper + private var allocateSize = initialBuffer + + override fun apply(t: T): ByteBuffer { + val buffer = ByteBuffer.allocateDirect(allocateSize) + return try { + ByteBufferBackedOutputStream(buffer).use { out -> + objectMapper.writeValue(out, t) + } + buffer.flip() + } catch (_: BufferOverflowException) { + // expected size for the buffer is too small, so increase it which will be used for the consequent serializations + val size = objectMapper.writeValueAsBytes(t).size + allocateSize = allocateSize.coerceAtLeast(size) + // and try again with the new size + apply(t) + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt index 954ae9abc..8c8b720a9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogMetrics.kt @@ -15,21 +15,21 @@ */ package io.emeraldpay.dshackle.monitoring -import io.emeraldpay.dshackle.commons.QueuePublisher +import io.emeraldpay.dshackle.commons.DataQueue import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.Metrics interface LogMetrics { - var queue: QueuePublisher<*>? + var queue: DataQueue<*>? fun produced() fun collected() fun dropped() class Enabled(category: String) : LogMetrics { - override var queue: QueuePublisher<*>? = null + override var queue: DataQueue<*>? = null private val produced = Counter.builder("monitoringLogs_produce") .description("Log events produced by Dshackle") @@ -66,7 +66,7 @@ interface LogMetrics { class None : LogMetrics { - override var queue: QueuePublisher<*>? = null + override var queue: DataQueue<*>? = null override fun produced() { } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogSerializer.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogSerializer.kt new file mode 100644 index 000000000..aa56037e8 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/LogSerializer.kt @@ -0,0 +1,6 @@ +package io.emeraldpay.dshackle.monitoring + +import java.nio.ByteBuffer +import java.util.function.Function + +interface LogSerializer : Function diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriter.kt index e678c4b10..147495de8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriter.kt @@ -20,9 +20,6 @@ import io.emeraldpay.dshackle.SilentException import io.emeraldpay.dshackle.commons.RateLimitedAction import org.slf4j.LoggerFactory import org.springframework.util.backoff.ExponentialBackOff -import reactor.core.Disposable -import reactor.core.publisher.Flux -import reactor.core.scheduler.Schedulers import java.io.IOException import java.net.InetSocketAddress import java.nio.channels.SocketChannel @@ -30,7 +27,6 @@ import java.time.Duration import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicReference /** * Send the logs to via a TCP socket. I.e., just push events encoded as JSON to the socket. @@ -40,10 +36,10 @@ class SocketLogWriter( val host: String, val port: Int, category: CurrentLogWriter.Category, - serializer: (T) -> ByteArray?, + serializer: LogSerializer?, encoding: LogEncoding, private val metrics: LogMetrics = LogMetrics.None(), - private val bufferSize: Int? = null, + bufferSize: Int? = null, ) : LogWriter, BufferingLogWriter(serializer, encoding, metrics = metrics, queueLimit = bufferSize ?: DEFAULT_BUFFER_SIZE) { companion object { @@ -52,8 +48,8 @@ class SocketLogWriter( private const val DEFAULT_BUFFER_SIZE: Int = 5000 } + private val stopped = AtomicBoolean(true) private val reconnecting = Semaphore(1) - private var currentSink = AtomicReference(null) private val shouldConnect = AtomicBoolean(false) private val closeSlow = RateLimitedAction(Duration.ofSeconds(15)) private val backOffConfig = ExponentialBackOff(100, 2.0) @@ -67,10 +63,11 @@ class SocketLogWriter( } onFull = { closeSlow.execute { - log.warn("Closing slow connection to $remoteId. Closed: ${currentSink.get()?.isDisposed}") + log.warn("Closing slow connection to $remoteId") reconnect() } } + stopped.set(false) shouldConnect.set(true) reconnect() } @@ -92,78 +89,88 @@ class SocketLogWriter( } } - private fun setProducingFlux(send: Disposable?) { - currentSink.updateAndGet { prev -> - if (prev != null && prev != send) { - prev.dispose() - } - send - } - } - private fun reconnect() { - setProducingFlux(connect()) + val thread = connect() ?: return + thread.start() } - private fun connect(): Disposable? { + private fun connect(): Thread? { if (!shouldConnect.get()) { return null } - try { - // it uses a plain TCP NIO Socket implementation instead of a Reactor TCP Client because (a) it's much easier - // to control in this simple situation and (b) because there is not backpressure in this case so a reactive way - // has no advantage - val socket = SocketChannel.open(InetSocketAddress(host, port)) - log.info("Connected to logs server $remoteId") - return Flux.from(readEncodedFromQueue()) - .map { - try { - socket.write(it) - } catch (t: IOException) { - // usually it's a connection issue, i.e., a "broken pipe". - // anyway, we just propagate it which closes the socket, etc. - throw SilentException(t.message ?: t.javaClass.name) - } - } - .switchOnFirst { t, u -> - if (t.isOnNext) { - backOff = backOffConfig.start() - } - u - } - .doOnNext { metrics.collected() } - .doOnError { t -> log.warn("Failed to write to $remoteId. ${t.message}") } - .doFinally { - log.info("Disconnected from logs server $remoteId / ${it.name}") - try { - socket.close() - } catch (t: Throwable) { - log.debug("Failed to close connection to $remoteId") + val runnable = Runnable { + var socket: SocketChannel? = null + try { + // it uses a plain TCP NIO Socket implementation instead of a Reactor TCP Client because (a) it's much easier + // to control in this simple situation and (b) because there is no backpressure in this case so a reactive way + // has no advantage + socket = SocketChannel.open(InetSocketAddress(host, port)) + log.info("Connected to logs server $remoteId") + sendToConnectedSocket(socket) + } catch (t: Throwable) { + log.error("Failed to connect to $remoteId", t) + // schedule a reconnection if needed + mayReconnect() + } finally { + socket?.close() + } + } + return Thread(runnable, "SocketLogWriter $remoteId").also { + it.isDaemon = true + } + } + + private fun sendToConnectedSocket(socket: SocketChannel) { + var sentFirst = false + + while (shouldConnect.get() && !stopped.get()) { + val toSend = next(100) + if (toSend.isEmpty()) { + Thread.sleep(25) + continue + } + var pos = 0 + try { + toSend + .forEach { + if (stopped.get()) { + log.debug("Stopping socket write to $remoteId") + returnBack(pos, toSend) + return + } + try { + pos++ + val encoded = encode(it) ?: return@forEach + socket.write(encoded) + metrics.collected() + if (!sentFirst) { + sentFirst = true + backOff = backOffConfig.start() + } + } catch (t: IOException) { + // usually it's a connection issue, i.e., a "broken pipe". + // anyway, we just propagate it which closes the socket, etc. + throw SilentException(t.message ?: t.javaClass.name) + } } - // schedule a reconnection if needed - mayReconnect() - } - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() - } catch (t: Throwable) { - log.error("Failed to connect to $remoteId", t) - // schedule a reconnection if needed - mayReconnect() - return null + } catch (t: Throwable) { + log.warn("Failed to write to $remoteId. ${t.message}") + returnBack(pos, toSend) + // schedule a reconnection if needed + mayReconnect() + break + } } } override fun stop() { shouldConnect.set(false) - currentSink.updateAndGet { prev -> - prev?.dispose() - null - } + stopped.set(true) super.stop() } override fun isRunning(): Boolean { - return currentSink.get()?.isDisposed == false + return !stopped.get() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/CurrentAccessLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/CurrentAccessLogWriter.kt index 6a8eb017b..a732bf1f0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/CurrentAccessLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/CurrentAccessLogWriter.kt @@ -15,9 +15,9 @@ */ package io.emeraldpay.dshackle.monitoring.accesslog -import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.config.MainConfig import io.emeraldpay.dshackle.monitoring.CurrentLogWriter +import io.emeraldpay.dshackle.monitoring.LogJsonSerializer import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Repository @@ -29,7 +29,7 @@ import javax.annotation.PreDestroy class CurrentAccessLogWriter( @Autowired mainConfig: MainConfig ) : CurrentLogWriter( - Category.ACCESS, serializer, + Category.ACCESS, LogJsonSerializer(), FileOptions(startSleep = START_SLEEP, flushSleep = FLUSH_SLEEP, batchLimit = WRITE_BATCH_LIMIT) ) { @@ -38,7 +38,6 @@ class CurrentAccessLogWriter( private const val WRITE_BATCH_LIMIT = 5000 private val FLUSH_SLEEP = Duration.ofMillis(250L) private val START_SLEEP = Duration.ofMillis(1000L) - private val serializer: (Any) -> ByteArray? = { next -> Global.objectMapper.writeValueAsBytes(next) } } private val config = mainConfig.accessLogConfig diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/requestlog/CurrentRequestLogWriter.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/requestlog/CurrentRequestLogWriter.kt index 35d8b71dd..c626a0ea8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/requestlog/CurrentRequestLogWriter.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/requestlog/CurrentRequestLogWriter.kt @@ -19,6 +19,7 @@ import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.config.MainConfig import io.emeraldpay.dshackle.monitoring.Channel import io.emeraldpay.dshackle.monitoring.CurrentLogWriter +import io.emeraldpay.dshackle.monitoring.LogJsonSerializer import io.emeraldpay.dshackle.monitoring.record.RequestRecord import io.emeraldpay.dshackle.reader.StandardRpcReader import io.emeraldpay.dshackle.upstream.rpcclient.LoggingJsonRpcReader @@ -33,7 +34,7 @@ import javax.annotation.PreDestroy open class CurrentRequestLogWriter( @Autowired mainConfig: MainConfig, ) : RequestLogWriter, CurrentLogWriter( - Category.REQUEST, serializer, + Category.REQUEST, LogJsonSerializer(), FileOptions(startSleep = START_SLEEP, flushSleep = FLUSH_SLEEP, batchLimit = WRITE_BATCH_LIMIT) ) { @@ -43,10 +44,6 @@ open class CurrentRequestLogWriter( private const val WRITE_BATCH_LIMIT = 5000 private val FLUSH_SLEEP = Duration.ofMillis(250L) private val START_SLEEP = Duration.ofMillis(1000L) - - private val serializer: (RequestRecord.BlockchainRequest) -> ByteArray? = { next -> - Global.objectMapper.writeValueAsBytes(next) - } } private val config = mainConfig.requestLogConfig diff --git a/src/test/kotlin/io/emeraldpay/dshackle/commons/QueuePublisherTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/commons/DataQueueTest.kt similarity index 51% rename from src/test/kotlin/io/emeraldpay/dshackle/commons/QueuePublisherTest.kt rename to src/test/kotlin/io/emeraldpay/dshackle/commons/DataQueueTest.kt index d3b5ef018..76bc3311f 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/commons/QueuePublisherTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/commons/DataQueueTest.kt @@ -5,92 +5,63 @@ import io.kotest.matchers.collections.shouldContainAll import io.kotest.matchers.collections.shouldHaveAtLeastSize import io.kotest.matchers.ints.shouldBeGreaterThan import io.kotest.matchers.shouldBe -import reactor.core.publisher.Flux -import reactor.test.StepVerifier -import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -class QueuePublisherTest : ShouldSpec({ +class DataQueueTest : ShouldSpec({ should("Produce current items") { - val queue = QueuePublisher(100) + val queue = DataQueue(100) - StepVerifier.create(queue) - .then { - queue.offer(1) - queue.offer(2) - } - .expectNext(1) - .expectNext(2) - .then { - queue.offer(3) - } - .expectNext(3) - .then { - queue.offer(4) - } - .expectNext(4) - .then { - queue.close() - } - .expectComplete() - .verify(Duration.ofSeconds(1)) + queue.offer(1) + queue.offer(2) + + queue.request(1) shouldBe listOf(1) + queue.request(1) shouldBe listOf(2) + + queue.offer(3) + queue.request(1) shouldBe listOf(3) + + queue.offer(4) + queue.request(1) shouldBe listOf(4) + + queue.close() } should("Produce queued items") { - val queue = QueuePublisher(100) + val queue = DataQueue(100) queue.offer(1) queue.offer(2) queue.offer(3) - StepVerifier.create(queue) - .expectNext(1) - .expectNext(2) - .expectNext(3) - .then { - queue.close() - } - .expectComplete() - .verify(Duration.ofSeconds(1)) + queue.request(10) shouldBe listOf(1, 2, 3) } should("Produce queued items and continue with fresh") { - val queue = QueuePublisher(100) + val queue = DataQueue(100) queue.offer(1) queue.offer(2) queue.offer(3) - StepVerifier.create(queue) - .expectNext(1) - .expectNext(2) - .expectNext(3) - .then { - queue.offer(4) - } - .expectNext(4) - .then { - queue.close() - } - .expectComplete() - .verify(Duration.ofSeconds(1)) + queue.request(10) shouldBe listOf(1, 2, 3) + + queue.offer(4) + queue.request(10) shouldBe listOf(4) } should("Produce nothing when closed") { - val queue = QueuePublisher(100) + val queue = DataQueue(100) queue.offer(1) queue.offer(2) queue.offer(3) queue.close() - StepVerifier.create(queue) - .expectComplete() - .verify(Duration.ofSeconds(1)) + queue.request(10) shouldBe emptyList() } should("Have zero size when closed") { - val queue = QueuePublisher(100) + val queue = DataQueue(100) (1..5).forEach { i -> queue.offer(i) } @@ -103,54 +74,20 @@ class QueuePublisherTest : ShouldSpec({ queue.size shouldBe 0 } - should("Produce items within a timeframe") { - val queue = QueuePublisher(100) - val t = Thread { - (1..10).forEach { i -> - queue.offer(i) - Thread.sleep(100) - } - } - - val flux = Flux.from(queue) - .take(Duration.ofMillis(350)) - - StepVerifier.create(flux) - .then { t.start() } - .expectNext(1) - .expectNext(2) - .expectNext(3) - .expectNext(4) - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - should("Produce X requested items") { - val queue = QueuePublisher(100) - val t = Thread { - (1..10).forEach { i -> - queue.offer(i) - Thread.sleep(100) - } + val queue = DataQueue(100) + (1..10).forEach { i -> + queue.offer(i) } - val flux = Flux.from(queue) - .take(5) - - StepVerifier.create(flux) - .then { t.start() } - .expectNext(1) - .expectNext(2) - .expectNext(3) - .expectNext(4) - .expectNext(5) - .expectComplete() - .verify(Duration.ofSeconds(1)) + queue.request(5) shouldBe listOf(1, 2, 3, 4, 5) + queue.request(3) shouldBe listOf(6, 7, 8) + queue.request(2) shouldBe listOf(9, 10) } should("Call onError when full") { val count = AtomicInteger(0) - val queue = QueuePublisher(3, onError = { count.incrementAndGet() }) + val queue = DataQueue(3, onError = { count.incrementAndGet() }) val offers = mutableListOf() (1..5).forEach { i -> offers.add(queue.offer(i)) @@ -163,7 +100,7 @@ class QueuePublisherTest : ShouldSpec({ should("Call onError when closed") { val count = AtomicInteger(0) - val queue = QueuePublisher(3, onError = { count.incrementAndGet() }) + val queue = DataQueue(3, onError = { count.incrementAndGet() }) val offers = mutableListOf() (1..2).forEach { i -> offers.add(queue.offer(i)) @@ -179,7 +116,7 @@ class QueuePublisherTest : ShouldSpec({ } should("Work with multiple threads") { - val queue = QueuePublisher(100) + val queue = DataQueue(100) val executor = Executors.newFixedThreadPool(6) executor.execute { (0 until 5).forEach { i -> @@ -209,10 +146,16 @@ class QueuePublisherTest : ShouldSpec({ val list1 = mutableListOf() val list2 = mutableListOf() executor.execute { - Flux.from(queue).take(10).subscribe(list1::add) + repeat(15) { + queue.request(1).forEach(list1::add) + Thread.sleep(7) + } } executor.execute { - Flux.from(queue).take(10).subscribe(list2::add) + repeat(15) { _ -> + queue.request(1).forEach(list2::add) + Thread.sleep(11) + } } executor.shutdown() diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriterTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriterTest.kt index d0f41002a..e12177114 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriterTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/BufferingLogWriterTest.kt @@ -1,22 +1,18 @@ package io.emeraldpay.dshackle.monitoring import io.kotest.core.spec.style.ShouldSpec -import io.kotest.matchers.optional.bePresent +import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.shouldNot -import reactor.test.StepVerifier +import io.kotest.matchers.shouldNotBe import java.lang.RuntimeException import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.time.Duration class BufferingLogWriterTest : ShouldSpec({ - val defaultSerializer = { x: String -> x.toByteArray() } class TempImpl( queueLimit: Int, - serializer: (String) -> ByteArray? = defaultSerializer, + serializer: LogSerializer = MonitoringTestCommons.defaultSerializer, encoding: LogEncoding = LogEncodingNewLine(), ) : BufferingLogWriter( serializer = serializer, @@ -32,11 +28,10 @@ class BufferingLogWriterTest : ShouldSpec({ writer.submit("test") - StepVerifier.create(writer.readFromQueue()) - .expectNext("test") - .then { writer.stop() } - .expectComplete() - .verify(Duration.ofSeconds(1)) + val next = writer.next(10) + + next shouldHaveSize 1 + next[0] shouldBe "test" } should("Accept and produce and encoded event") { @@ -45,81 +40,64 @@ class BufferingLogWriterTest : ShouldSpec({ writer.submit("test") writer.submit("test2") - StepVerifier.create(writer.readEncodedFromQueue().map { StandardCharsets.UTF_8.decode(it).toString() }) - .expectNext("test\n") - .expectNext("test2\n") - .then { writer.stop() } - .expectComplete() - .verify(Duration.ofSeconds(1)) + val next = writer.next(10) + next shouldHaveSize 2 + MonitoringTestCommons.bufferToString(writer.encode(next[0])!!) shouldBe "test\n" + MonitoringTestCommons.bufferToString(writer.encode(next[1])!!) shouldBe "test2\n" } should("Ignore serializer errors") { - val writer = TempImpl(10, serializer = { - if (it == "fail") { - throw RuntimeException() - } - it.toByteArray() - }) + val writer = TempImpl(10, MonitoringTestCommons.failSerializer) - val fail = writer.toByteBuffer("fail") - fail shouldNot bePresent() + val fail = writer.encode("fail") + fail shouldBe null - val test = writer.toByteBuffer("test") - test should bePresent() - StandardCharsets.UTF_8.decode(test.get()).toString() shouldBe "test\n" + val test = writer.encode("test") + test shouldNotBe null + MonitoringTestCommons.bufferToString(test!!) shouldBe "test\n" } should("Produce ignoring serializer errors") { - val writer = TempImpl(10, serializer = { - if (it == "fail") { - throw RuntimeException() - } - it.toByteArray() - }) + val writer = TempImpl(10, serializer = MonitoringTestCommons.failSerializer) writer.submit("fail") writer.submit("test") - StepVerifier.create( - writer.readEncodedFromQueue() - .map { StandardCharsets.UTF_8.decode(it).toString() } - ) - .expectNext("test\n") - .then { writer.stop() } - .expectComplete() - .verify(Duration.ofSeconds(3)) + val next = writer.next(10) + writer.encode(next[0]) shouldBe null + MonitoringTestCommons.bufferToString(writer.encode(next[1])!!) shouldBe "test\n" } should("Ignore encoding errors") { val writer = TempImpl( 10, encoding = object : LogEncoding { - override fun write(bytes: ByteArray): ByteBuffer { - if ("fail" == String(bytes)) { + override fun write(bytes: ByteBuffer): ByteBuffer { + if ("fail" == String(bytes.array())) { throw RuntimeException() } - return ByteBuffer.wrap(bytes) + return bytes } } ) - val fail = writer.toByteBuffer("fail") - fail shouldNot bePresent() + val fail = writer.encode("fail") + fail shouldBe null - val test = writer.toByteBuffer("test") - test should bePresent() - StandardCharsets.UTF_8.decode(test.get()).toString() shouldBe "test" + val test = writer.encode("test") + test shouldNotBe null + MonitoringTestCommons.bufferToString(test!!) shouldBe "test" } should("Produce ignoring encoder errors") { val writer = TempImpl( 10, encoding = object : LogEncoding { - override fun write(bytes: ByteArray): ByteBuffer { - if ("fail" == String(bytes)) { + override fun write(bytes: ByteBuffer): ByteBuffer { + if ("fail" == String(bytes.array())) { throw RuntimeException() } - return ByteBuffer.wrap(bytes) + return bytes } } ) @@ -127,13 +105,29 @@ class BufferingLogWriterTest : ShouldSpec({ writer.submit("fail") writer.submit("test") - StepVerifier.create( - writer.readEncodedFromQueue() - .map { StandardCharsets.UTF_8.decode(it).toString() } - ) - .expectNext("test") - .then { writer.stop() } - .expectComplete() - .verify(Duration.ofSeconds(3)) + val next = writer.next(10) + writer.encode(next[0]) shouldBe null + MonitoringTestCommons.bufferToString(writer.encode(next[1])!!) shouldBe "test" + } + + should("Return unprocessed events") { + val writer = TempImpl(10) + + writer.submit("test-1") + writer.submit("test-2") + writer.submit("test-3") + writer.submit("test-4") + writer.submit("test-5") + + val next = writer.next(10) + + next shouldHaveSize 5 + writer.returnBack(2, next) + + val next2 = writer.next(10) + next2 shouldHaveSize 3 + MonitoringTestCommons.bufferToString(writer.encode(next2[0])!!) shouldBe "test-3\n" + MonitoringTestCommons.bufferToString(writer.encode(next2[1])!!) shouldBe "test-4\n" + MonitoringTestCommons.bufferToString(writer.encode(next2[2])!!) shouldBe "test-5\n" } }) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriterTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriterTest.kt index 832390204..bd8757997 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriterTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/CurrentLogWriterTest.kt @@ -8,8 +8,7 @@ import java.time.Duration class CurrentLogWriterTest : ShouldSpec({ - val serializer = { it: String -> it.toByteArray() } - class TestImpl(fileOptions: FileOptions? = null) : CurrentLogWriter(Category.REQUEST, serializer, fileOptions) + class TestImpl(fileOptions: FileOptions? = null) : CurrentLogWriter(Category.REQUEST, MonitoringTestCommons.defaultSerializer, fileOptions) should("Create file logger") { val current = TestImpl(CurrentLogWriter.FileOptions(Duration.ofSeconds(1), Duration.ofSeconds(1), 100)) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriterTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriterTest.kt index 1bafdad4b..52c5417b4 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriterTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/FileLogWriterTest.kt @@ -19,12 +19,8 @@ class FileLogWriterTest : ShouldSpec({ val accessLog = dir.resolve("log.jsonl") println("Write log to $accessLog") - val serializer: (Any) -> ByteArray? = { - Global.objectMapper.writeValueAsBytes(it) - } - val logWriter = FileLogWriter( - accessLog, serializer, Duration.ofMillis(1000), Duration.ofMillis(1000), 100 + accessLog, LogJsonSerializer(), Duration.ofMillis(1000), Duration.ofMillis(1000), 100 ) val event = AccessRecord.Status( diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLineTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLineTest.kt index 6aec96f27..4330cb6bb 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLineTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingNewLineTest.kt @@ -2,15 +2,15 @@ package io.emeraldpay.dshackle.monitoring import io.kotest.core.spec.style.ShouldSpec import io.kotest.matchers.shouldBe -import java.nio.charset.StandardCharsets +import java.nio.ByteBuffer class LogEncodingNewLineTest : ShouldSpec({ should("Write a line") { val encoding = LogEncodingNewLine() - val act = encoding.write("Hello World!".toByteArray()) + val act = encoding.write(ByteBuffer.wrap("Hello World!".toByteArray())) - StandardCharsets.UTF_8.decode(act).toString() shouldBe "Hello World!\n" + MonitoringTestCommons.bufferToString(act) shouldBe "Hello World!\n" } }) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefixTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefixTest.kt index 266e2827a..6f6b01472 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefixTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogEncodingPrefixTest.kt @@ -6,29 +6,30 @@ import io.kotest.matchers.string.shouldEndWith import io.kotest.matchers.string.shouldHaveLength import io.kotest.matchers.string.shouldStartWith import org.apache.commons.codec.binary.Hex +import java.nio.ByteBuffer class LogEncodingPrefixTest : ShouldSpec({ should("Encode single byte") { - val act = LogEncodingPrefix().write("a".toByteArray()) + val act = LogEncodingPrefix().write(ByteBuffer.wrap("a".toByteArray())) - Hex.encodeHexString(act) shouldBe "0000000161" + MonitoringTestCommons.bufferToHex(act) shouldBe "0000000161" } should("Encode short string") { - val act = LogEncodingPrefix().write("Hello World!".toByteArray()) + val act = LogEncodingPrefix().write(ByteBuffer.wrap("Hello World!".toByteArray())) - Hex.encodeHexString(act) shouldBe "0000000c48656c6c6f20576f726c6421" + MonitoringTestCommons.bufferToHex(act) shouldBe "0000000c48656c6c6f20576f726c6421" } should("Encode medium string") { val s = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." val act = LogEncodingPrefix() .write( - s.toByteArray() + ByteBuffer.wrap(s.toByteArray()) ) - val hex = Hex.encodeHexString(act) + val hex = MonitoringTestCommons.bufferToHex(act) s shouldHaveLength 0x01bd hex shouldStartWith "000001bd" diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogJsonSerializerTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogJsonSerializerTest.kt new file mode 100644 index 000000000..ce2da542f --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/LogJsonSerializerTest.kt @@ -0,0 +1,51 @@ +package io.emeraldpay.dshackle.monitoring + +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.monitoring.record.AccessRecord +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.string.shouldHaveMinLength +import java.time.Instant + +class LogJsonSerializerTest : ShouldSpec({ + + should("serialize a message") { + val message = AccessRecord.RequestDetails( + id = java.util.UUID.fromString("ad6d16d1-8148-4dbe-b4ed-ce345511d078"), + start = Instant.parse("2023-10-11T12:13:14Z"), + remote = null + ) + val serializer = LogJsonSerializer() + + val buffer = serializer.apply(message) + val act = MonitoringTestCommons.bufferToString(buffer) + + act shouldBe """ + {"id":"ad6d16d1-8148-4dbe-b4ed-ce345511d078","start":"2023-10-11T12:13:14Z"} + """.trimIndent() + } + + should("serialize a long message") { + val message = AccessRecord.RequestDetails( + id = java.util.UUID.fromString("ad6d16d1-8148-4dbe-b4ed-ce345511d078"), + start = Instant.parse("2023-10-11T12:13:14Z"), + remote = AccessRecord.Remote( + ips = listOf("127.0.0.1", "10.0.3.1", "10.23.51.11", "100.101.102.103"), + ip = "100.101.102.103", + userAgent = "Lorem ipsum dolor sit amet nulla magna occaecat tempor enim laborum mollit aliquip minim Lorem id, culpa ex aliqua. Consequat do dolor consequat anim magna veniam enim quis cupidatat duis, aute fugiat nisi officia mollit reprehenderit aute. Commodo nisi veniam incididunt consectetur proident non consectetur sunt laborum eu reprehenderit nisi mollit laboris minim Lorem irure, ad aliquip. Nostrud ut enim nulla labore amet, enim veniam in labore eiusmod elit anim nisi duis pariatur." + ) + ) + val serializer = LogJsonSerializer(32) + + val buffer = serializer.apply(message) + val act = MonitoringTestCommons.bufferToString(buffer) + + act shouldHaveMinLength 32 + act shouldContain "Lorem ipsum" + act shouldContain "magna veniam" + + val parsed = Global.objectMapper.readValue(act, AccessRecord.RequestDetails::class.java) + parsed shouldBe message + } +}) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/MonitoringTestCommons.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/MonitoringTestCommons.kt new file mode 100644 index 000000000..5cc3c374c --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/MonitoringTestCommons.kt @@ -0,0 +1,34 @@ +package io.emeraldpay.dshackle.monitoring + +import org.apache.commons.codec.binary.Hex +import java.lang.RuntimeException +import java.nio.ByteBuffer + +class MonitoringTestCommons { + + companion object { + val defaultSerializer: LogSerializer = object : LogSerializer { + override fun apply(t: String): ByteBuffer = ByteBuffer.wrap(t.toByteArray()) + } + val failSerializer: LogSerializer = object : LogSerializer { + override fun apply(t: String): ByteBuffer { + if (t == "fail") { + throw RuntimeException() + } + return ByteBuffer.wrap(t.toByteArray()) + } + } + + fun bufferToString(buffer: ByteBuffer): String { + val bytes = ByteArray(buffer.remaining()) + buffer.get(bytes) + return String(bytes) + } + + fun bufferToHex(buffer: ByteBuffer): String { + val bytes = ByteArray(buffer.remaining()) + buffer.get(bytes) + return Hex.encodeHexString(bytes) + } + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriterTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriterTest.kt index 14973ab6d..037ebec21 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriterTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/monitoring/SocketLogWriterTest.kt @@ -12,8 +12,6 @@ import java.util.concurrent.TimeUnit class SocketLogWriterTest : ShouldSpec({ - val serializer = { x: String -> x.toByteArray() } - class MockServer( val port: Int, val expectMessages: Int, @@ -70,7 +68,7 @@ class SocketLogWriterTest : ShouldSpec({ server.launch() server.awaitLaunch(Duration.ofSeconds(5)) - val writer = SocketLogWriter("localhost", port, CurrentLogWriter.Category.REQUEST, serializer, LogEncodingNewLine()) + val writer = SocketLogWriter("localhost", port, CurrentLogWriter.Category.REQUEST, MonitoringTestCommons.defaultSerializer, LogEncodingNewLine()) writer.start() writer.submit("Hello World!") @@ -94,7 +92,7 @@ class SocketLogWriterTest : ShouldSpec({ server2.launch() }.start() - val writer = SocketLogWriter("localhost", port, CurrentLogWriter.Category.REQUEST, serializer, LogEncodingNewLine()) + val writer = SocketLogWriter("localhost", port, CurrentLogWriter.Category.REQUEST, MonitoringTestCommons.defaultSerializer, LogEncodingNewLine()) println("Starting...") writer.start()