Skip to content

Commit

Permalink
Fixes lower data calculation (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Feb 1, 2024
1 parent aebd15f commit 7fba7df
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 10 deletions.
4 changes: 2 additions & 2 deletions foundation/src/main/resources/chains.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ chain-settings:
label: Arbitrum Nova
type: eth
settings:
expected-block-time: 1s
expected-block-time: 260ms
options:
disable-validation: true
lags:
Expand Down Expand Up @@ -737,7 +737,7 @@ chain-settings:
type: solana
settings:
currency: SOL
expected-block-time: 1s
expected-block-time: 400ms
options:
validate-peers: false
lags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

typealias LowerBoundBlockDetectorBuilder = (Chain, Upstream) -> LowerBoundBlockDetector
Expand All @@ -21,11 +22,20 @@ abstract class LowerBoundBlockDetector(
protected val log = LoggerFactory.getLogger(this::class.java)

fun lowerBlock(): Flux<LowerBlockData> {
val notProcessing = AtomicBoolean(true)

return Flux.interval(
Duration.ofSeconds(15),
Duration.ofSeconds(60),
Duration.ofMinutes(periodRequest()),
)
.flatMap { lowerBlockDetect() }
.filter { notProcessing.get() }
.flatMap {
notProcessing.set(false)
lowerBlockDetect()
}
.doOnNext {
notProcessing.set(true)
}
.filter { it.blockNumber > currentLowerBlock.get().blockNumber }
.map {
log.info("Lower block of ${upstream.getId()} $chain: block height - {}, slot - {}", it.blockNumber, it.slot ?: "NA")
Expand All @@ -39,6 +49,8 @@ abstract class LowerBoundBlockDetector(

protected abstract fun lowerBlockDetect(): Mono<LowerBlockData>

protected abstract fun periodRequest(): Long

data class LowerBlockData(
val blockNumber: Long,
val slot: Long?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ abstract class RecursiveLowerBoundBlockDetector(
} else {
val middle = middleBlock(data)

if (data.left > data.right) {
if (data.left > data.right || middle == 0L) {
val current = if (data.current == 0L) 1 else data.current
Mono.just(LowerBoundData(current, true))
} else {
Expand Down Expand Up @@ -57,7 +57,7 @@ abstract class RecursiveLowerBoundBlockDetector(
Long.MAX_VALUE,
Duration.ofSeconds(1),
)
.maxBackoff(Duration.ofSeconds(3))
.maxBackoff(Duration.ofMinutes(3))
.filter {
!nonRetryableErrors.any { err -> it.message?.contains(err, true) ?: false }
}
Expand All @@ -71,5 +71,9 @@ abstract class RecursiveLowerBoundBlockDetector(
}
}

override fun periodRequest(): Long {
return 10
}

protected abstract fun hasState(blockNumber: Long): Mono<Boolean>
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class EthereumLowerBoundBlockDetector(
"after last accepted block",
"Version has either been pruned, or is for a future block", // cronos
"no historical RPC is available for this historical", // optimism
"historical backend error", // optimism
"load state tree: failed to load state tree", // filecoin
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class SolanaLowerBoundBlockDetector(
.retryWhen(
Retry
.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(3))
.maxBackoff(Duration.ofMinutes(3))
.doAfterRetry {
log.debug(
"Error in calculation of lower block of upstream {}, retry attempt - {}, message - {}",
Expand All @@ -90,4 +90,8 @@ class SolanaLowerBoundBlockDetector(
},
)
}

override fun periodRequest(): Long {
return 3
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ class StarknetLowerBoundBlockDetector(
override fun lowerBlockDetect(): Mono<LowerBlockData> {
return Mono.just(LowerBlockData(1))
}

override fun periodRequest(): Long {
return 120
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RecursiveLowerBoundBlockDetectorTest {
StepVerifier.withVirtualTime { detector.lowerBlock() }
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(15))
.expectNext(LowerBoundBlockDetector.LowerBlockData(17964844L))
.expectNextMatches { it.blockNumber == 17964844L }
.thenCancel()
.verify(Duration.ofSeconds(3))

Expand All @@ -64,7 +64,7 @@ class RecursiveLowerBoundBlockDetectorTest {
StepVerifier.withVirtualTime { detector.lowerBlock() }
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(15))
.expectNext(LowerBoundBlockDetector.LowerBlockData(1))
.expectNextMatches { it.blockNumber == 1L }
.thenCancel()
.verify(Duration.ofSeconds(3))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class SolanaLowerBoundBlockDetectorTest {
StepVerifier.withVirtualTime { detector.lowerBlock() }
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(15))
.expectNext(LowerBoundBlockDetector.LowerBlockData(21000000, 23000010))
.expectNextMatches { it.blockNumber == 21000000L && it.slot == 23000010L }
.thenCancel()
.verify(Duration.ofSeconds(3))

Expand Down

0 comments on commit 7fba7df

Please sign in to comment.