Skip to content

Commit

Permalink
music: throw on deadlocks
Browse files Browse the repository at this point in the history
Attempt to throw an exception when any part of the loading routine
times out.
  • Loading branch information
OxygenCobalt committed Dec 19, 2023
1 parent 4cb309f commit d3de34e
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 11 deletions.
9 changes: 7 additions & 2 deletions app/src/main/java/org/oxycblt/auxio/music/MusicRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.oxycblt.auxio.music.metadata.Separators
import org.oxycblt.auxio.music.metadata.TagExtractor
import org.oxycblt.auxio.music.user.MutableUserLibrary
import org.oxycblt.auxio.music.user.UserLibrary
import org.oxycblt.auxio.util.forEachWithTimeout
import org.oxycblt.auxio.util.logD
import org.oxycblt.auxio.util.logE
import org.oxycblt.auxio.util.logW
Expand Down Expand Up @@ -448,6 +449,7 @@ constructor(
try {
tagExtractor.consume(incompleteSongs, completeSongs)
} catch (e: Exception) {
logD("Tag extraction failed: $e")
completeSongs.close(e)
return@async
}
Expand All @@ -464,6 +466,7 @@ constructor(
deviceLibraryFactory.create(
completeSongs, processedSongs, separators, nameFactory)
} catch (e: Exception) {
logD("DeviceLibrary creation failed: $e")
processedSongs.close(e)
return@async Result.failure(e)
}
Expand All @@ -474,8 +477,10 @@ constructor(
// We could keep track of a total here, but we also need to collate this RawSong information
// for when we write the cache later on in the finalization step.
val rawSongs = LinkedList<RawSong>()
for (rawSong in processedSongs) {
rawSongs.add(rawSong)
// Use a longer timeout so that dependent components can timeout and throw errors that
// provide more context than if we timed out here.
processedSongs.forEachWithTimeout(20000) {
rawSongs.add(it)
// Since discovery takes up the bulk of the music loading process, we switch to
// indicating a defined amount of loaded songs in comparison to the projected amount
// of songs that were queried.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import org.oxycblt.auxio.music.fs.contentResolverSafe
import org.oxycblt.auxio.music.fs.useQuery
import org.oxycblt.auxio.music.info.Name
import org.oxycblt.auxio.music.metadata.Separators
import org.oxycblt.auxio.util.forEachWithTimeout
import org.oxycblt.auxio.util.logW
import org.oxycblt.auxio.util.sendWithTimeout
import org.oxycblt.auxio.util.unlikelyToBeNull

/**
Expand Down Expand Up @@ -130,7 +132,7 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory {
// TODO: Use comparators here

// All music information is grouped as it is indexed by other components.
for (rawSong in rawSongs) {
rawSongs.forEachWithTimeout { rawSong ->
val song = SongImpl(rawSong, nameFactory, separators)
// At times the indexer produces duplicate songs, try to filter these. Comparing by
// UID is sufficient for something like this, and also prevents collisions from
Expand All @@ -142,8 +144,8 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory {
// We still want to say that we "processed" the song so that the user doesn't
// get confused at why the bar was only partly filled by the end of the loading
// process.
processedSongs.send(rawSong)
continue
processedSongs.sendWithTimeout(rawSong)
return@forEachWithTimeout
}
songGrouping[song.uid] = song

Expand Down Expand Up @@ -206,7 +208,7 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory {
}
}

processedSongs.send(rawSong)
processedSongs.sendWithTimeout(rawSong)
}

// Now that all songs are processed, also process albums and group them into their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.oxycblt.auxio.music.metadata.parseId3v2PositionField
import org.oxycblt.auxio.music.metadata.transformPositionField
import org.oxycblt.auxio.util.getSystemServiceCompat
import org.oxycblt.auxio.util.logD
import org.oxycblt.auxio.util.sendWithTimeout

/**
* The layer that loads music from the [MediaStore] database. This is an intermediate step in the
Expand Down Expand Up @@ -205,10 +206,10 @@ private abstract class BaseMediaStoreExtractor(protected val context: Context) :
val rawSong = RawSong()
query.populateFileInfo(rawSong)
if (cache?.populate(rawSong) == true) {
completeSongs.send(rawSong)
completeSongs.sendWithTimeout(rawSong)
} else {
query.populateTags(rawSong)
incompleteSongs.send(rawSong)
incompleteSongs.sendWithTimeout(rawSong)
}
yield()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import javax.inject.Inject
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.yield
import org.oxycblt.auxio.music.device.RawSong
import org.oxycblt.auxio.util.forEachWithTimeout
import org.oxycblt.auxio.util.logD
import org.oxycblt.auxio.util.sendWithTimeout

/**
* The extractor that leverages ExoPlayer's [MetadataRetriever] API to parse metadata. This is the
Expand Down Expand Up @@ -55,14 +57,14 @@ class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWork

logD("Beginning primary extraction loop")

for (incompleteRawSong in incompleteSongs) {
incompleteSongs.forEachWithTimeout { incompleteRawSong ->
spin@ while (true) {
for (i in tagWorkerPool.indices) {
val worker = tagWorkerPool[i]
if (worker != null) {
val completeRawSong = worker.poll()
if (completeRawSong != null) {
completeSongs.send(completeRawSong)
completeSongs.sendWithTimeout(completeRawSong)
yield()
} else {
continue
Expand All @@ -83,7 +85,7 @@ class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWork
if (task != null) {
val completeRawSong = task.poll()
if (completeRawSong != null) {
completeSongs.send(completeRawSong)
completeSongs.sendWithTimeout(completeRawSong)
tagWorkerPool[i] = null
yield()
} else {
Expand Down
59 changes: 59 additions & 0 deletions app/src/main/java/org/oxycblt/auxio/util/StateUtil.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ import androidx.fragment.app.Fragment
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import java.util.concurrent.TimeoutException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout

/**
* A wrapper around [StateFlow] exposing a one-time consumable event.
Expand Down Expand Up @@ -146,3 +151,57 @@ private fun Fragment.launch(
) {
viewLifecycleOwner.lifecycleScope.launch { viewLifecycleOwner.repeatOnLifecycle(state, block) }
}

/**
* Wraps [SendChannel.send] with a specified timeout.
*
* @param element The element to send.
* @param timeout The timeout in milliseconds. Defaults to 10 seconds.
* @throws TimeoutException If the timeout is reached, provides context on what element
* specifically.
*/
suspend fun <E> SendChannel<E>.sendWithTimeout(element: E, timeout: Long = 10000) {
try {
withTimeout(timeout) { send(element) }
} catch (e: Exception) {
throw TimeoutException("Timed out sending element $element to channel: $e")
}
}

/**
* Wraps a [ReceiveChannel] consumption with a specified timeout. Note that the timeout will only
* start on the first element received, as to prevent initialization of dependent coroutines being
* interpreted as a timeout.
*
* @param action The action to perform on each element received.
* @param timeout The timeout in milliseconds. Defaults to 10 seconds.
* @throws TimeoutException If the timeout is reached, provides context on what element
* specifically.
*/
suspend fun <E> ReceiveChannel<E>.forEachWithTimeout(
timeout: Long = 10000,
action: suspend (E) -> Unit
) {
var exhausted = false
var subsequent = false
val handler: suspend () -> Unit = {
val value = receiveCatching()
if (value.isClosed) {
exhausted = true
} else {
action(value.getOrThrow())
}
}
while (!exhausted) {
try {
if (subsequent) {
withTimeout(timeout) { handler() }
} else {
handler()
subsequent = true
}
} catch (e: TimeoutCancellationException) {
throw TimeoutException("Timed out receiving element from channel: $e")
}
}
}

0 comments on commit d3de34e

Please sign in to comment.