Skip to content

Commit

Permalink
Merge pull request #264 from emeraldpay/fix/slow-logging
Browse files Browse the repository at this point in the history
Fix slow logging
  • Loading branch information
splix authored Oct 4, 2023
2 parents 35c94db + ee843af commit 714ea1d
Show file tree
Hide file tree
Showing 24 changed files with 485 additions and 439 deletions.
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Global.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import io.emeraldpay.api.Chain
import io.emeraldpay.dshackle.monitoring.MonitoringContext
import io.emeraldpay.dshackle.upstream.bitcoin.data.EsploraUnspent
Expand Down Expand Up @@ -94,6 +95,7 @@ class Global {
objectMapper.registerModule(module)
objectMapper.registerModule(Jdk8Module())
objectMapper.registerModule(JavaTimeModule())
objectMapper.registerModule(KotlinModule.Builder().build())
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
objectMapper
.setDateFormat(SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS"))
Expand Down
118 changes: 118 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/DataQueue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Copyright (c) 2023 EmeraldPay, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.commons

import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

/**
* A Queue for data for further processing.
* It is thread-safe, i.e. it can have multiple (fire-and-forget) providers and multiple readers. For the case of
* multiple readers it doesn't share the original queue items, i.e., each reader get own set of original items.
*/
class DataQueue<T>(
/**
* Queue Limit. When the subscribers are slower that providers it starts to drop new items when reached this size
*/
private val queueLimit: Int,
onError: ((Error) -> Unit) = {}
) {

private val locks = ReentrantLock()
private var queue = ArrayList<T>()
private var closed = false
private val nonFailingOnError: ((Error) -> Unit) = {
try { onError(it) } catch (t: Throwable) {}
}

enum class Error {
FULL, CLOSED, INTERNAL
}

/**
* Current queue length
*/
val size: Int
get() = locks.withLock { queue.size }

/**
* Put a new item into the queue
*/
fun offer(values: List<T>): Boolean {
locks.withLock {
if (queue.size >= queueLimit) {
nonFailingOnError(Error.FULL)
return false
}
if (closed) {
nonFailingOnError(Error.CLOSED)
return false
}
queue.addAll(values)
return true
}
}

fun offer(value: T): Boolean {
locks.withLock {
if (queue.size >= queueLimit) {
nonFailingOnError(Error.FULL)
return false
}
if (closed) {
nonFailingOnError(Error.CLOSED)
return false
}
queue.add(value)
return true
}
}

/**
* Close the queue, which also clears the queue.
* Queue doesn't accept new items when closed.
*/
fun close() {
locks.withLock {
closed = true
queue.clear()
}
}

fun request(limit: Int): List<T> {
return locks.withLock {
if (queue.isEmpty()) {
emptyList()
} else if (queue.size < limit) {
val copy = queue
queue = ArrayList(copy.size)
copy
} else {
val copy = queue
val back = ArrayList<T>(queue.size)
val result = ArrayList<T>(limit)
for (i in 0 until limit) {
result.add(copy[i])
}
for (i in limit until copy.size) {
back.add(copy[i])
}
queue = back
result
}
}
}
}
141 changes: 0 additions & 141 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/QueuePublisher.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
*/
package io.emeraldpay.dshackle.monitoring

import io.emeraldpay.dshackle.commons.QueuePublisher
import io.emeraldpay.dshackle.commons.DataQueue
import io.emeraldpay.dshackle.commons.RateLimitedAction
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import java.nio.ByteBuffer
import java.time.Duration
import java.util.Optional

abstract class BufferingLogWriter<T>(
private val serializer: (T) -> ByteArray?,
private val serializer: LogSerializer<T>?,
val encoding: LogEncoding,
val queueLimit: Int = 4096,
private val metrics: LogMetrics = LogMetrics.None(),
Expand All @@ -37,52 +35,51 @@ abstract class BufferingLogWriter<T>(
protected var onFull: () -> Unit = {}

private val errors = RateLimitedAction(Duration.ofSeconds(1))
private val onQueueError: ((QueuePublisher.Error) -> Unit) = { err ->
private val onQueueError: ((DataQueue.Error) -> Unit) = { err ->
when (err) {
QueuePublisher.Error.FULL -> {
DataQueue.Error.FULL -> {
metrics.dropped()
errors.execute { log.warn("Queue is full: ${queue.size}") }
onFull()
}
QueuePublisher.Error.CLOSED -> errors.execute { log.warn("Queue is closed") }
QueuePublisher.Error.INTERNAL -> errors.execute { log.warn("Queue cannot processes a log message") }
DataQueue.Error.CLOSED -> errors.execute { log.warn("Queue is closed") }
DataQueue.Error.INTERNAL -> errors.execute { log.warn("Queue cannot processes a log message") }
}
}
private val queue = QueuePublisher<T>(queueLimit, onError = onQueueError)
private val queue = DataQueue<T>(queueLimit, onError = onQueueError)

init {
metrics.queue = queue
}

override fun submit(event: T) {
metrics.produced()
queue.offer(event)
metrics.produced()
}

override fun submitAll(events: List<T>) {
events.forEach(::submit)
queue.offer(events)
events.forEach { _ -> metrics.produced() }
}

fun readFromQueue(): Flux<T> {
return Flux.from(queue)
fun next(limit: Int): List<T> {
return queue.request(limit)
}

fun readEncodedFromQueue(): Flux<ByteBuffer> {
return readFromQueue()
.map(::toByteBuffer)
.filter(Optional<ByteBuffer>::isPresent)
.map(Optional<ByteBuffer>::get)
fun returnBack(index: Int, events: List<T>) {
queue.offer(events.drop(index))
// don't fire the metrics update because it's they were already counted
}

fun toByteBuffer(event: T): Optional<ByteBuffer> {
fun encode(event: T): ByteBuffer? {
val line = try {
serializer(event)
serializer?.apply(event)
} catch (t: Throwable) {
errors.execute {
log.warn("Failed to serialize event: ${t.message}")
}
null
} ?: return Optional.empty()
return null
} ?: return null

val encoded = try {
encoding.write(line)
Expand All @@ -92,7 +89,7 @@ abstract class BufferingLogWriter<T>(
}
null
}
return Optional.ofNullable(encoded)
return encoded
}

fun isEmpty(): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.Locale

abstract class CurrentLogWriter<T>(
private val category: Category,
private val serializer: (T) -> ByteArray?,
private val serializer: LogSerializer<T>?,
private val fileOptions: FileOptions? = null
) {

Expand Down
Loading

0 comments on commit 714ea1d

Please sign in to comment.