Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wamp-server) #76: optional timeout in callBlocking #77

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@ import kotlin.time.Duration.Companion.milliseconds
class WampCallManager(
private val logger: Logger,
private val send: (str: String) -> Unit,
val timeoutInMs: Long,
val defaultTimeoutInMs: Long,
private val shutdown: AtomicBoolean = AtomicBoolean(false)
) {
private val clock = Clock.System
private var currentCall: WampCall? = null

fun callBlocking(logContext: String, startCall: Instant, message: WampMessage): WampMessage {
fun callBlocking(logContext: String, startCall: Instant, message: WampMessage, specificTimeoutInMs: Long? = null): WampMessage {
val now = clock.now()
val timeoutInMs = specificTimeoutInMs ?: defaultTimeoutInMs
synchronized(this) {
while (currentCall != null && (clock.now() - startCall).inWholeMilliseconds < timeoutInMs) {
Thread.sleep(10)
}
if (currentCall != null) {
throw IllegalStateException("$logContext can't send a call when another one is pending")
}
currentCall = WampCall(logContext, message)
currentCall = WampCall(logContext, message, timeoutInMs)
}
val pendingCallLatency = clock.now() - now
if (pendingCallLatency > 400.milliseconds) {
Expand Down Expand Up @@ -86,7 +87,10 @@ class WampCallManager(
fun await() {
val now = Clock.System.now()
synchronized(this) {
while (currentCall != null && (Clock.System.now() - now).inWholeMilliseconds < timeoutInMs) {
while (
currentCall != null &&
(Clock.System.now() - now).inWholeMilliseconds < (currentCall?.timeoutInMs ?: Long.MAX_VALUE)
) {
Thread.sleep(10)
}
val call = currentCall
Expand All @@ -100,6 +104,7 @@ class WampCallManager(
private data class WampCall(
val logContext: String,
val msg: WampMessage,
var timeoutInMs: Long,
val latch: CountDownLatch = CountDownLatch(1),
var response: WampMessage? = null
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ interface OcppWampServer {
/**
* Sends a WampMessage call to a ChargingStation, identified by its ocpp id.
*
* @param timeoutInMs if not specified, the timeout from settings will be used
* @throws IllegalStateException if no such ChargingStation is currently connected to this server
*/
fun sendBlocking(ocppId: CSOcppId, message: WampMessage): WampMessage
fun sendBlocking(ocppId: CSOcppId, message: WampMessage, timeoutInMs: Long? = null): WampMessage

/**
* registers a wamp server handler on this server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,12 @@ class OcppWampServerApp(
}
}

fun sendBlocking(ocppId: CSOcppId, message: WampMessage, startedCallAt: Instant = Clock.System.now()): WampMessage =
getChargingStationConnection(ocppId).sendBlocking(message, startedCallAt)
fun sendBlocking(
ocppId: CSOcppId,
message: WampMessage,
startedCallAt: Instant = Clock.System.now(),
timeoutInMs: Long? = null
): WampMessage = getChargingStationConnection(ocppId).sendBlocking(message, startedCallAt, timeoutInMs)

// Throws NoConnectionException when no connection is found for the specified ocpp id
private fun getChargingStationConnection(ocppId: CSOcppId): ChargingStationConnection {
Expand Down Expand Up @@ -274,8 +278,8 @@ class OcppWampServerApp(
val callManager: WampCallManager =
WampCallManager(logger, { m: String -> ws.send(WsMessage(m)) }, timeoutInMs, shutdown)

fun sendBlocking(message: WampMessage, startCall: Instant): WampMessage =
callManager.callBlocking("[$ocppId] [$wsConnectionId]", startCall, message)
fun sendBlocking(message: WampMessage, startCall: Instant, timeoutInMs: Long? = null): WampMessage =
callManager.callBlocking("[$ocppId] [$wsConnectionId]", startCall, message, timeoutInMs)

fun close() {
logger.info("[$ocppId] [$wsConnectionId] - closing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class UndertowOcppWampServer(
wsApp = null
}

override fun sendBlocking(ocppId: CSOcppId, message: WampMessage): WampMessage =
override fun sendBlocking(ocppId: CSOcppId, message: WampMessage, timeoutInMs: Long?): WampMessage =
getWsApp()
.sendBlocking(ocppId, message)
.sendBlocking(ocppId, message, timeoutInMs = timeoutInMs)

override fun register(handler: OcppWampServerHandler) {
handlers.add(handler)
Expand Down
Loading