From 44fab693c5aeda5fd023572a46d13c08335598ad Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Thu, 4 Apr 2024 14:15:30 +0400 Subject: [PATCH] Fix solana subs (#449) --- .../generic/GenericIngressSubscription.kt | 2 +- .../generic/GenericSubscriptionConnectTest.kt | 36 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt index c58c6eb6c..4dbd83901 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt @@ -38,7 +38,7 @@ class GenericSubscriptionConnect( @Suppress("UNCHECKED_CAST") override fun createConnection(): Flux { - return conn.subscribe(ChainRequest(topic, ListParams(getParams(params)))) + return conn.subscribe(ChainRequest(topic, ListParams(getParams(params) as List))) .data .timeout(Duration.ofSeconds(60), Mono.empty()) .onErrorResume { Mono.empty() } as Flux diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt new file mode 100644 index 000000000..db8e36f26 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt @@ -0,0 +1,36 @@ +package io.emeraldpay.dshackle.upstream.generic + +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.junit.jupiter.api.Test +import org.mockito.Mockito.verify +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import reactor.core.publisher.Flux +import reactor.test.StepVerifier +import java.time.Duration +import java.util.concurrent.atomic.AtomicReference + +class GenericSubscriptionConnectTest { + + @Test + fun `test request param is flat list`() { + val param: List = listOf("all") + val topic = "topic" + val response = "hello".toByteArray() + val ws = mock { + on { subscribe(ChainRequest(topic, ListParams(param))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(response), "", AtomicReference("")) + } + + val genericSubscriptionConnect = GenericSubscriptionConnect(ws, topic, param) + + StepVerifier.create(genericSubscriptionConnect.createConnection()) + .expectNext(response) + .expectComplete() + .verify(Duration.ofSeconds(1)) + + verify(ws).subscribe(ChainRequest(topic, ListParams(param))) + } +}