Skip to content

Commit

Permalink
TW-84904: Fix the long-running Slack health report. The cause of it i…
Browse files Browse the repository at this point in the history
…s that only successful aggregating requests responses were cached, so that unavailable Slack may cause 5s timeout waiting for each eligible buildtype. This change adds caching to failed AggregatedSlackApi queries, so that timeout calls are not repeated (actually, this logic is already present in CachingWebSlackApi)
  • Loading branch information
Sagolbah authored and qodana-bot committed Nov 23, 2023
1 parent e603190 commit ad97ba1
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@

package jetbrains.buildServer.notification.slackNotifier.slack

import jetbrains.buildServer.BaseTestCase
import jetbrains.buildServer.serverSide.impl.BaseServerTestCase
import jetbrains.buildServer.util.TestFor
import jetbrains.buildServer.util.ssl.SSLTrustStoreProvider
import org.testng.annotations.Test
import java.util.concurrent.TimeoutException

class SlackWebApiImplTest : BaseTestCase() {
class SlackWebApiImplTest : BaseServerTestCase() {
private val standardResponse = "{" +
"\"ok\": true," +
"\"channel\": \"C1H9RESGL\"," +
"\"ts\": \"1503435956.000247\"" +
"}"

private lateinit var slackApi: SlackWebApiImpl
private lateinit var slackResponse: MaybeMessage
private lateinit var slackApi: SlackWebApi
private lateinit var slackResponse: MaybeError
private lateinit var aggregatedSlackApi: AggregatedSlackApi

@Test
fun `should post message`() {
Expand All @@ -51,31 +54,85 @@ class SlackWebApiImplTest : BaseTestCase() {
`then result should be error`()
}

@Test(timeOut = 10_000)
@TestFor(issues = ["TW-84904"])
fun `should not repeat timeout calls`() {
`given slack api is caching and times out`()
`when userinfo is called multiple times`()
`then result should be error`()
}

@Test(timeOut = 10_000)
@TestFor(issues = ["TW-84904"])
fun `should not repeat timeout aggregating calls`() {
`given slack api is aggregating and times out`()
`then multiple same aggregating requests are not hanging`()
}


private fun `given slack is responding correctly`() {
slackApi = SlackWebApiImpl(
RequestHandlerStub(standardResponse),
SSLTrustStoreProvider { null }
RequestHandlerStub(standardResponse),
SSLTrustStoreProvider { null }
)
}

private fun `given slack fails on first request`() {
slackApi = SlackWebApiImpl(
FailingFirstRequestHandler(standardResponse),
FailingFirstRequestHandler(standardResponse),
SSLTrustStoreProvider { null }
)
}

private fun `given slack api is caching and times out`() {
slackApi = CachingSlackWebApi(
SlackWebApiImpl(
TimeoutRequestHandler(standardResponse),
SSLTrustStoreProvider { null }
),
myFixture.executorServices
)
}

private fun `given slack api is aggregating and times out`() {
aggregatedSlackApi = AggregatedSlackApi(
object : SlackWebApiFactory {
override fun createSlackWebApi(): SlackWebApi {
return SlackWebApiImpl(
TimeoutRequestHandler(standardResponse),
SSLTrustStoreProvider { null }
)
}
},
myFixture.executorServices
)
}

private fun `given slack always fails`() {
slackApi = SlackWebApiImpl(
AlwaysFailingRequestHandler(),
SSLTrustStoreProvider { null }
AlwaysFailingRequestHandler(),
SSLTrustStoreProvider { null }
)
}

private fun `when message is sent`() {
slackResponse = slackApi.postMessage(slackToken, Message("#test_channel", "Test message"))
}

private fun `when userinfo is called multiple times`() {
repeat(10) { slackResponse = slackApi.usersInfo(slackToken, "TestUserId") }
}

private fun `then multiple same aggregating requests are not hanging`() {
repeat(10) {
try {
aggregatedSlackApi.getChannelsList(slackToken)
} catch (e: SlackResponseError) {
// timeout expected
}
}
}

private fun `then result should be successful`() {
assertTrue(slackResponse.ok)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2000-2023 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jetbrains.buildServer.notification.slackNotifier.slack

import jetbrains.buildServer.util.HTTPRequestBuilder
import jetbrains.buildServer.util.http.AsyncRequest
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeoutException

class TimeoutRequestHandler(
private val response: String
) : HTTPRequestBuilder.RequestHandler {
@Deprecated("Deprecated in Java")
override fun doRequest(request: HTTPRequestBuilder.Request) {
Thread.sleep(3_000)
request.onException.accept(TimeoutException("Request to Slack timeout"), request)
}

override fun doAsyncRequest(p0: AsyncRequest): CompletableFuture<HTTPRequestBuilder.Response> {
TODO("Not yet implemented")
}

override fun doSyncRequest(p0: HTTPRequestBuilder.Request): HTTPRequestBuilder.Response {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class AggregatedSlackApi(
)
.executor(executorServices.lowPriorityExecutorService)
.maximumWeight(TeamCityProperties.getLong(SlackNotifierProperties.maximumChannelsToCache, 50_000))
.weigher { _: String, channels: List<Channel> -> channels.size }
.buildAsync<String, List<Channel>>()
.weigher { _: String, channels: AggregatedSlackList<Channel> -> channels.data.size }
.buildAsync<String, AggregatedSlackList<Channel>>()

private val myUsersCache = Caffeine.newBuilder()
.expireAfterWrite(
Expand All @@ -50,8 +50,8 @@ class AggregatedSlackApi(
)
.executor(executorServices.lowPriorityExecutorService)
.maximumWeight(TeamCityProperties.getLong(SlackNotifierProperties.maximumUsersToCache, 50_000))
.weigher { _: String, users: List<User> -> users.size }
.buildAsync<String, List<User>>()
.weigher { _: String, users: AggregatedSlackList<User> -> users.data.size }
.buildAsync<String, AggregatedSlackList<User>>()

private val myConversationMembersCache = Caffeine.newBuilder()
.expireAfterWrite(
Expand All @@ -60,37 +60,47 @@ class AggregatedSlackApi(
)
.executor(executorServices.lowPriorityExecutorService)
.maximumWeight(TeamCityProperties.getLong(SlackNotifierProperties.maximumConversationMembersToCache, 50_000))
.weigher { _: String, members: List<String> -> members.size }
.buildAsync<String, List<String>>()
.weigher { _: String, members: AggregatedSlackList<String> -> members.data.size }
.buildAsync<String, AggregatedSlackList<String>>()

// Main bot info (like id or team id) doesn't change over time, so it's safe to cache them indefinitely
// If some changing info (like display name) should be cached, Guava expirable cache should be used instead
private val myBotCache: MutableMap<String, AggregatedBot> = Collections.synchronizedMap(WeakHashMap())

private val readTimeoutMs = 5_000L

// TW-84904
// Error responses should also be cached. Without it, consecutive failures may drain the rate limits.
// In worst case, it can result into consecutive timeouts,
// so the Slack health reports may take extremely long time to calculate.
fun getChannelsList(token: String): List<Channel> {
return myChannelsCache.getAsync(token, readTimeoutMs) {
val res = myChannelsCache.getAsync(token, readTimeoutMs) {
getList { cursor ->
slackApi.conversationsList(token, cursor)
}
}
if (!res.ok) throw SlackResponseError(res.error)
return res.data
}

fun getUsersList(token: String): List<User> {
return myUsersCache.getAsync(token, readTimeoutMs) {
val res = myUsersCache.getAsync(token, readTimeoutMs) {
getList { cursor ->
slackApi.usersList(token, cursor)
}
}
if (!res.ok) throw SlackResponseError(res.error)
return res.data
}

fun getConversationMembers(token: String, channelId: String): List<String> {
return myConversationMembersCache.getAsync(token, readTimeoutMs) {
val res = myConversationMembersCache.getAsync(token, readTimeoutMs) {
getList { cursor ->
slackApi.conversationsMembers(token, channelId, cursor)
}
}
if (!res.ok) throw SlackResponseError(res.error)
return res.data
}

fun getBot(token: String): AggregatedBot {
Expand All @@ -106,22 +116,22 @@ class AggregatedSlackApi(
}
}

private fun <T, D> getList(dataProvider: (String?) -> D): List<T> where D : SlackList<T>, D : MaybeError {
private fun <T, D> getList(dataProvider: (String?) -> D): AggregatedSlackList<T> where D : SlackList<T>, D : MaybeError {
val result = mutableListOf<T>()
var cursor: String? = null
var prevCursor: String?

do {
val data = dataProvider(cursor)
if (!data.ok) {
throw SlackResponseError(data.error)
return AggregatedSlackList(ok = false, error = data.error, needed = data.needed)
}
prevCursor = cursor
cursor = data.nextCursor
result.addAll(data.data)
} while (cursor != "" && cursor != prevCursor)

return result
return AggregatedSlackList(ok = true, data = result)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,14 @@ interface SlackList<T> {
val data: List<T>
}

// completed list, gained with cursor pagination
data class AggregatedSlackList<T>(
override val ok: Boolean = false,
override val error: String = "",
override val needed: String = "",
val data: List<T> = emptyList()
) : MaybeError

// https://api.slack.com/methods/channels.list
data class ChannelsList(
override val ok: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import jetbrains.buildServer.notification.slackNotifier.SlackNotifierProperties
import jetbrains.buildServer.serverSide.IOGuard
import jetbrains.buildServer.serverSide.TeamCityProperties
import jetbrains.buildServer.util.HTTPRequestBuilder
import jetbrains.buildServer.util.NamedThreadFactory
import jetbrains.buildServer.util.ssl.SSLTrustStoreProvider
import java.nio.charset.Charset
import java.util.*
Expand Down Expand Up @@ -229,14 +230,17 @@ class SlackWebApiImpl(
response = it.bodyAsString
}
.onException(Consumer<Exception> {
// TODO better handle connection timeouts. In the code it's treated as unknown error, and the exact cause is available only in logs.
logger.warn(
"Exception occurred when sending request to Slack: ${it.message}"
)
exceptions.add(it)
})
.build()

requestHandler.doRequest(post)
NamedThreadFactory.executeWithNewThreadName("Performing Slack API request at $baseUrl/${path}") {
requestHandler.doRequest(post)
}

val isException = exceptions.size > maxNumberOfRetries

Expand Down

0 comments on commit ad97ba1

Please sign in to comment.