Skip to content

Commit

Permalink
fix: use reactive channels instead of looping over playback queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pax-k committed Jul 18, 2024
1 parent 37ba080 commit d7d8aa7
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import expo.modules.kotlin.modules.Module
import expo.modules.kotlin.modules.ModuleDefinition
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.math.max
import kotlin.math.min
import kotlinx.coroutines.CoroutineScope
Expand All @@ -18,39 +17,50 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext

class ExpoAudioStreamModule : Module() {
data class ChunkData(val chunk: String, val promise: Promise)
data class ChunkData(val chunk: String, val promise: Promise) // contains the base64 chunk
data class AudioChunk(
val audioData: FloatArray,
val promise: Promise,
var isPromiseSettled: Boolean = false
) // contains the decoded base64 chunk

data class AudioChunk(val audioData: FloatArray, val promise: Promise)
private lateinit var processingChannel: Channel<ChunkData>
private lateinit var playbackChannel: Channel<AudioChunk>

private val coroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private lateinit var audioTrack: AudioTrack
private val playbackQueue = ConcurrentLinkedQueue<AudioChunk>()
private val processingChannel = Channel<ChunkData>(Channel.UNLIMITED)

private var processingJob: Job? = null
private var currentPlaybackJob: Job? = null

private lateinit var audioTrack: AudioTrack
private var isPlaying = false
private var currentPlaybackJob: Job? = null

override fun definition() = ModuleDefinition {
Name("ExpoAudioStream")

OnCreate { initializeAudioTrack() }
OnCreate {
initializeAudioTrack()
initializeChannels()
}

OnDestroy {
stopProcessingLoop()
stopPlayback()
processingChannel.close()
stopProcessingLoop()
coroutineScope.cancel()
}

AsyncFunction("streamRiff16Khz16BitMonoPcmChunk") { chunk: String, promise: Promise ->
coroutineScope.launch {
if (processingChannel.isClosedForSend || playbackChannel.isClosedForSend) {
initializeChannels()
}
processingChannel.send(ChunkData(chunk, promise))
ensureProcessingLoopStarted()
}
Expand All @@ -65,6 +75,11 @@ class ExpoAudioStreamModule : Module() {
AsyncFunction("stop") { promise: Promise -> stopPlayback(promise) }
}

private fun initializeChannels() {
processingChannel = Channel<ChunkData>(Channel.UNLIMITED)
playbackChannel = Channel<AudioChunk>(Channel.UNLIMITED)
}

private fun ensureProcessingLoopStarted() {
if (processingJob == null || processingJob?.isActive != true) {
startProcessingLoop()
Expand All @@ -76,7 +91,7 @@ class ExpoAudioStreamModule : Module() {
coroutineScope.launch {
for (chunkData in processingChannel) {
processAndEnqueueChunk(chunkData)
if (processingChannel.isEmpty && !isPlaying && playbackQueue.isEmpty()) {
if (processingChannel.isEmpty && !isPlaying && playbackChannel.isEmpty) {
break // Stop the loop if there's no more work to do
}
}
Expand All @@ -95,7 +110,7 @@ class ExpoAudioStreamModule : Module() {
val audioDataWithoutRIFF = removeRIFFHeaderIfNeeded(decodedBytes)
val audioData = convertPCMDataToFloatArray(audioDataWithoutRIFF)

playbackQueue.offer(AudioChunk(audioData, chunkData.promise))
playbackChannel.send(AudioChunk(audioData, chunkData.promise))

if (!isPlaying) {
startPlayback()
Expand Down Expand Up @@ -132,6 +147,7 @@ class ExpoAudioStreamModule : Module() {
audioTrack.play()
isPlaying = true
startPlaybackLoop()
ensureProcessingLoopStarted()
}
promise?.resolve(null)
} catch (e: Exception) {
Expand All @@ -145,8 +161,24 @@ class ExpoAudioStreamModule : Module() {
audioTrack.flush()
isPlaying = false
currentPlaybackJob?.cancel()
playbackQueue.clear()
stopProcessingLoop()
currentPlaybackJob = null

// Resolve promises for any remaining chunks in the playback channel
coroutineScope.launch {
for (chunk in playbackChannel) {
if (!chunk.isPromiseSettled) {
chunk.isPromiseSettled = true
chunk.promise.resolve(null)
}
}
}

// Cancel the processing job and close the channels
processingJob?.cancel()
processingJob = null
processingChannel.close()
playbackChannel.close()

promise?.resolve(null)
} catch (e: Exception) {
promise?.reject("ERR_STOP_PLAYBACK", e.message, e)
Expand Down Expand Up @@ -185,12 +217,12 @@ class ExpoAudioStreamModule : Module() {
private fun startPlaybackLoop() {
currentPlaybackJob =
coroutineScope.launch {
while (isActive && isPlaying) {
val chunk = playbackQueue.poll()
if (chunk != null) {
playbackChannel.consumeAsFlow().collect { chunk ->
if (isPlaying) {
playChunk(chunk)
} else {
delay(10)
// If not playing, we should resolve the promise to avoid leaks
chunk.promise.resolve(null)
}
}
}
Expand All @@ -206,7 +238,10 @@ class ExpoAudioStreamModule : Module() {
object : AudioTrack.OnPlaybackPositionUpdateListener {
override fun onMarkerReached(track: AudioTrack) {
audioTrack.setPlaybackPositionUpdateListener(null)
chunk.promise.resolve(null)
if (!chunk.isPromiseSettled) {
chunk.isPromiseSettled = true
chunk.promise.resolve(null)
}
continuation.resumeWith(Result.success(Unit))
}

Expand All @@ -226,16 +261,32 @@ class ExpoAudioStreamModule : Module() {
if (written != chunkSize) {
audioTrack.setPlaybackPositionUpdateListener(null)
val error = Exception("Failed to write entire audio chunk")
chunk.promise.reject("ERR_PLAYBACK", error.message, error)
if (!chunk.isPromiseSettled) {
chunk.isPromiseSettled = true
// chunk.promise.reject("ERR_PLAYBACK",
// error.message, error)
chunk.promise.resolve(null)
}
continuation.resumeWith(Result.failure(error))
}

continuation.invokeOnCancellation {
audioTrack.setPlaybackPositionUpdateListener(null)
if (!chunk.isPromiseSettled) {
chunk.isPromiseSettled = true
chunk.promise.reject(
"ERR_PLAYBACK_CANCELLED",
"Playback was cancelled",
null
)
}
}
}
} catch (e: Exception) {
chunk.promise.reject("ERR_PLAYBACK", e.message, e)
if (!chunk.isPromiseSettled) {
chunk.isPromiseSettled = true
chunk.promise.reject("ERR_PLAYBACK", e.message, e)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mykin-ai/expo-audio-stream",
"version": "0.1.20",
"version": "0.1.21",
"description": "Expo Audio Stream module",
"main": "build/index.js",
"types": "build/index.d.ts",
Expand Down

0 comments on commit d7d8aa7

Please sign in to comment.