From 0064d1f76ccb43b778e0a7d8feb04ad0e1521ad6 Mon Sep 17 00:00:00 2001 From: Lilian Gallon Date: Wed, 11 Sep 2024 16:41:22 +0200 Subject: [PATCH 1/2] feat(wamp-server) #76: optional timeout in callBlocking If unset, use default one given by wamp server settings --- .../com/izivia/ocpp/wamp/core/WampCallManager.kt | 14 ++++++++++---- .../com/izivia/ocpp/wamp/server/OcppWampServer.kt | 3 ++- .../ocpp/wamp/server/impl/OcppWampServerApp.kt | 12 ++++++++---- .../wamp/server/impl/UndertowOcppWampServer.kt | 4 ++-- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt index 7c3c4885..6dc0e594 100644 --- a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt +++ b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt @@ -12,14 +12,15 @@ import kotlin.time.Duration.Companion.milliseconds class WampCallManager( private val logger: Logger, private val send: (str: String) -> Unit, - val timeoutInMs: Long, + val defaultTimeoutInMs: Long, private val shutdown: AtomicBoolean = AtomicBoolean(false) ) { private val clock = Clock.System private var currentCall: WampCall? = null - fun callBlocking(logContext: String, startCall: Instant, message: WampMessage): WampMessage { + fun callBlocking(logContext: String, startCall: Instant, message: WampMessage, specificTimeoutInMs: Long? = null): WampMessage { val now = clock.now() + val timeoutInMs = specificTimeoutInMs ?: defaultTimeoutInMs synchronized(this) { while (currentCall != null && (clock.now() - startCall).inWholeMilliseconds < timeoutInMs) { Thread.sleep(10) @@ -27,7 +28,7 @@ class WampCallManager( if (currentCall != null) { throw IllegalStateException("$logContext can't send a call when another one is pending") } - currentCall = WampCall(logContext, message) + currentCall = WampCall(logContext, message, timeoutInMs) } val pendingCallLatency = clock.now() - now if (pendingCallLatency > 400.milliseconds) { @@ -86,7 +87,11 @@ class WampCallManager( fun await() { val now = Clock.System.now() synchronized(this) { - while (currentCall != null && (Clock.System.now() - now).inWholeMilliseconds < timeoutInMs) { + val capturedCurentCall = currentCall + while ( + capturedCurentCall != null && + (Clock.System.now() - now).inWholeMilliseconds < capturedCurentCall.timeoutInMs + ) { Thread.sleep(10) } val call = currentCall @@ -100,6 +105,7 @@ class WampCallManager( private data class WampCall( val logContext: String, val msg: WampMessage, + var timeoutInMs: Long, val latch: CountDownLatch = CountDownLatch(1), var response: WampMessage? = null ) diff --git a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/OcppWampServer.kt b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/OcppWampServer.kt index 2e896a24..810c119d 100644 --- a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/OcppWampServer.kt +++ b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/OcppWampServer.kt @@ -32,9 +32,10 @@ interface OcppWampServer { /** * Sends a WampMessage call to a ChargingStation, identified by its ocpp id. * + * @param timeoutInMs if not specified, the timeout from settings will be used * @throws IllegalStateException if no such ChargingStation is currently connected to this server */ - fun sendBlocking(ocppId: CSOcppId, message: WampMessage): WampMessage + fun sendBlocking(ocppId: CSOcppId, message: WampMessage, timeoutInMs: Long? = null): WampMessage /** * registers a wamp server handler on this server. diff --git a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/OcppWampServerApp.kt b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/OcppWampServerApp.kt index ce1bdcfe..c743e9d4 100644 --- a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/OcppWampServerApp.kt +++ b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/OcppWampServerApp.kt @@ -240,8 +240,12 @@ class OcppWampServerApp( } } - fun sendBlocking(ocppId: CSOcppId, message: WampMessage, startedCallAt: Instant = Clock.System.now()): WampMessage = - getChargingStationConnection(ocppId).sendBlocking(message, startedCallAt) + fun sendBlocking( + ocppId: CSOcppId, + message: WampMessage, + startedCallAt: Instant = Clock.System.now(), + timeoutInMs: Long? = null + ): WampMessage = getChargingStationConnection(ocppId).sendBlocking(message, startedCallAt, timeoutInMs) // Throws NoConnectionException when no connection is found for the specified ocpp id private fun getChargingStationConnection(ocppId: CSOcppId): ChargingStationConnection { @@ -274,8 +278,8 @@ class OcppWampServerApp( val callManager: WampCallManager = WampCallManager(logger, { m: String -> ws.send(WsMessage(m)) }, timeoutInMs, shutdown) - fun sendBlocking(message: WampMessage, startCall: Instant): WampMessage = - callManager.callBlocking("[$ocppId] [$wsConnectionId]", startCall, message) + fun sendBlocking(message: WampMessage, startCall: Instant, timeoutInMs: Long? = null): WampMessage = + callManager.callBlocking("[$ocppId] [$wsConnectionId]", startCall, message, timeoutInMs) fun close() { logger.info("[$ocppId] [$wsConnectionId] - closing") diff --git a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/UndertowOcppWampServer.kt b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/UndertowOcppWampServer.kt index 0f4ee04a..68728c08 100644 --- a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/UndertowOcppWampServer.kt +++ b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/server/impl/UndertowOcppWampServer.kt @@ -70,9 +70,9 @@ class UndertowOcppWampServer( wsApp = null } - override fun sendBlocking(ocppId: CSOcppId, message: WampMessage): WampMessage = + override fun sendBlocking(ocppId: CSOcppId, message: WampMessage, timeoutInMs: Long?): WampMessage = getWsApp() - .sendBlocking(ocppId, message) + .sendBlocking(ocppId, message, timeoutInMs = timeoutInMs) override fun register(handler: OcppWampServerHandler) { handlers.add(handler) From 749fc10ddae015146bc8e9f622723559fbfbd7fa Mon Sep 17 00:00:00 2001 From: Lilian Gallon Date: Mon, 16 Sep 2024 11:19:53 +0200 Subject: [PATCH 2/2] feat(wamp-server) #76: fixup! while --- .../main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt index 6dc0e594..5ed25e42 100644 --- a/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt +++ b/ocpp-wamp/src/main/kotlin/com/izivia/ocpp/wamp/core/WampCallManager.kt @@ -87,10 +87,9 @@ class WampCallManager( fun await() { val now = Clock.System.now() synchronized(this) { - val capturedCurentCall = currentCall while ( - capturedCurentCall != null && - (Clock.System.now() - now).inWholeMilliseconds < capturedCurentCall.timeoutInMs + currentCall != null && + (Clock.System.now() - now).inWholeMilliseconds < (currentCall?.timeoutInMs ?: Long.MAX_VALUE) ) { Thread.sleep(10) }