Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
pault-t-canva committed Nov 8, 2023
1 parent 193c020 commit 06282c2
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ plugins {

allprojects {
group = "net.devslash.fetchdsl"
version = "0.26.0"
version = "0.26.1"

repositories {
mavenCentral()
Expand Down
86 changes: 53 additions & 33 deletions service/src/main/kotlin/net/devslash/HttpSessionManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import java.util.concurrent.atomic.AtomicReference

typealias Contents<T> = Pair<HttpRequest, RequestData<T>>

class HttpSessionManager(private val engine: Driver) : SessionManager, AutoCloseable {
class HttpSessionManager(private val engine: Driver) : SessionManager,
AutoCloseable {

private var count: Int = 0
private val jobThreadPool = System.getProperty("HTTP_THREAD_POOL_SIZE")?.toInt()
private val jobThreadPool =
System.getProperty("HTTP_THREAD_POOL_SIZE")?.toInt()
?: (Runtime.getRuntime().availableProcessors() * 2)
private val httpThreadPool = Executors.newFixedThreadPool(jobThreadPool)
private val dispatcher = httpThreadPool.asCoroutineDispatcher()
Expand All @@ -38,11 +40,21 @@ class HttpSessionManager(private val engine: Driver) : SessionManager, AutoClose
httpThreadPool.awaitTermination(100L, TimeUnit.MILLISECONDS)
}

override fun <T> call(call: Call<T>, session: Session): Exception? = call(call, session, DefaultCookieJar())
override fun <T> call(call: Call<T>, session: Session): Exception? =
call(call, session, DefaultCookieJar())

override fun <T> call(call: Call<T>, session: Session, jar: CookieJar): Exception? = runBlocking(Dispatchers.Default) {
val channel: Channel<Envelope<Contents<T>>> = Channel(call.lifecycleController?.getRequestQueueDepth()
?: (session.concurrency * 2))
override fun <T> call(
call: Call<T>,
session: Session,
jar: CookieJar
): Exception? = runBlocking(
Dispatchers.Default
+ Context.current().asContextElement()
) {
val channel: Channel<Envelope<Contents<T>>> = Channel(
call.lifecycleController?.getRequestQueueDepth()
?: (session.concurrency * 2)
)
val callRunner = { c: Call<T> -> call(c, session, jar) }
launch(Dispatchers.IO + Context.current().asContextElement()) {
RequestProducer().produceHttp(callRunner, call, jar, channel)
Expand All @@ -56,10 +68,10 @@ class HttpSessionManager(private val engine: Driver) : SessionManager, AutoClose

if (hasDelay) {
println(
"Delay has been set to $delay ms. This means that after a call has been made, " + //
"there will be a delay of at least $delay ms before the beginning of the next one.\n" + //
"Due to a delay being set - the number of HTTP threads has been locked to 1. " + //
"Effectively `session.concurrency = 1`"
"Delay has been set to $delay ms. This means that after a call has been made, " + //
"there will be a delay of at least $delay ms before the beginning of the next one.\n" + //
"Due to a delay being set - the number of HTTP threads has been locked to 1. " + //
"Effectively `session.concurrency = 1`"
)
}

Expand All @@ -68,18 +80,18 @@ class HttpSessionManager(private val engine: Driver) : SessionManager, AutoClose

// Take the call concurrency before defaulting to the session concurrency
val concurrency = if (hasDelay) 1 else call.concurrency
?: session.concurrency
?: session.concurrency
val storedException = AtomicReference<Exception>(null)

repeat(concurrency) {
jobs += launch(dispatcher + Context.current().asContextElement()) {
launchHttpProcessor(
call,
session,
limiter,
afterRequest,
channel,
storedException
call,
session,
limiter,
afterRequest,
channel,
storedException
)
}
}
Expand All @@ -89,12 +101,12 @@ class HttpSessionManager(private val engine: Driver) : SessionManager, AutoClose
}

private suspend fun <T> launchHttpProcessor(
call: Call<T>,
session: Session,
rateLimiter: AcquiringRateLimiter,
afterRequest: List<AfterHook>,
channel: Channel<Envelope<Pair<HttpRequest, RequestData<T>>>>,
storedException: AtomicReference<Exception>
call: Call<T>,
session: Session,
rateLimiter: AcquiringRateLimiter,
afterRequest: List<AfterHook>,
channel: Channel<Envelope<Pair<HttpRequest, RequestData<T>>>>,
storedException: AtomicReference<Exception>
) {
for (next in channel) {
if (storedException.get() != null) {
Expand All @@ -120,30 +132,35 @@ class HttpSessionManager(private val engine: Driver) : SessionManager, AutoClose

@Suppress("UNCHECKED_CAST")
private fun <T> handleSuccess(
resp: Success<HttpResponse>,
afterRequest: List<AfterHook>,
contents: Pair<HttpRequest, RequestData<T>>,
resp: Success<HttpResponse>,
afterRequest: List<AfterHook>,
contents: Pair<HttpRequest, RequestData<T>>,
) {
count++
val mappedResponse = resp.value
afterRequest.forEach {
when (it) {
is SimpleAfterHook -> it.accept(mappedResponse.copy())
is BodyMutatingAfterHook -> it.accept(mappedResponse)
is FullDataAfterHook -> it.accept(contents.first, mappedResponse, contents.second)
is FullDataAfterHook -> it.accept(
contents.first,
mappedResponse,
contents.second
)

is ResolvedFullDataAfterHook<*> -> (it as ResolvedFullDataAfterHook<T>)
.accept(contents.first, mappedResponse, contents.second.get())
.accept(contents.first, mappedResponse, contents.second.get())

else -> throw RuntimeException("Unexpected after hook type $it")
}
}
}

private suspend fun <T> handleFailure(
call: Call<T>,
channel: Channel<Envelope<Pair<HttpRequest, RequestData<T>>>>,
next: Envelope<Pair<HttpRequest, RequestData<T>>>,
exception: Failure<java.lang.Exception>
call: Call<T>,
channel: Channel<Envelope<Pair<HttpRequest, RequestData<T>>>>,
next: Envelope<Pair<HttpRequest, RequestData<T>>>,
exception: Failure<java.lang.Exception>
) {
when (val onError = call.onError) {
is OnErrorWithState -> onError.accept(channel, next, exception.err)
Expand All @@ -154,7 +171,10 @@ class HttpSessionManager(private val engine: Driver) : SessionManager, AutoClose
}


private suspend fun makeRequest(modelRequest: HttpRequest, session: Session): HttpResult<HttpResponse, java.lang.Exception> {
private suspend fun makeRequest(
modelRequest: HttpRequest,
session: Session
): HttpResult<HttpResponse, java.lang.Exception> {
maybeDelay(session)
val result = engine.call(modelRequest)
lastCall = clock.millis()
Expand Down

0 comments on commit 06282c2

Please sign in to comment.