From d7d8aa7dadf6280badbb0c6dff9fe155f2977edf Mon Sep 17 00:00:00 2001 From: pax-k Date: Thu, 18 Jul 2024 23:39:30 +0300 Subject: [PATCH] fix: use reactive channels instead of looping over playback queue --- .../audiostream/ExpoAudioStreamModule.kt | 95 ++++++++++++++----- package.json | 2 +- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt b/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt index f7bf406..4236a38 100644 --- a/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt +++ b/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt @@ -9,7 +9,6 @@ import expo.modules.kotlin.modules.Module import expo.modules.kotlin.modules.ModuleDefinition import java.nio.ByteBuffer import java.nio.ByteOrder -import java.util.concurrent.ConcurrentLinkedQueue import kotlin.math.max import kotlin.math.min import kotlinx.coroutines.CoroutineScope @@ -18,39 +17,50 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive +import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext class ExpoAudioStreamModule : Module() { - data class ChunkData(val chunk: String, val promise: Promise) + data class ChunkData(val chunk: String, val promise: Promise) // contains the base64 chunk + data class AudioChunk( + val audioData: FloatArray, + val promise: Promise, + var isPromiseSettled: Boolean = false + ) // contains the decoded base64 chunk - data class AudioChunk(val audioData: FloatArray, val promise: Promise) + private lateinit var processingChannel: Channel + private lateinit var playbackChannel: Channel private val coroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) - private lateinit var audioTrack: AudioTrack - private val playbackQueue = ConcurrentLinkedQueue() - private val processingChannel = Channel(Channel.UNLIMITED) + private var processingJob: Job? = null + private var currentPlaybackJob: Job? = null + private lateinit var audioTrack: AudioTrack private var isPlaying = false - private var currentPlaybackJob: Job? = null override fun definition() = ModuleDefinition { Name("ExpoAudioStream") - OnCreate { initializeAudioTrack() } + OnCreate { + initializeAudioTrack() + initializeChannels() + } OnDestroy { - stopProcessingLoop() stopPlayback() + processingChannel.close() + stopProcessingLoop() coroutineScope.cancel() } AsyncFunction("streamRiff16Khz16BitMonoPcmChunk") { chunk: String, promise: Promise -> coroutineScope.launch { + if (processingChannel.isClosedForSend || playbackChannel.isClosedForSend) { + initializeChannels() + } processingChannel.send(ChunkData(chunk, promise)) ensureProcessingLoopStarted() } @@ -65,6 +75,11 @@ class ExpoAudioStreamModule : Module() { AsyncFunction("stop") { promise: Promise -> stopPlayback(promise) } } + private fun initializeChannels() { + processingChannel = Channel(Channel.UNLIMITED) + playbackChannel = Channel(Channel.UNLIMITED) + } + private fun ensureProcessingLoopStarted() { if (processingJob == null || processingJob?.isActive != true) { startProcessingLoop() @@ -76,7 +91,7 @@ class ExpoAudioStreamModule : Module() { coroutineScope.launch { for (chunkData in processingChannel) { processAndEnqueueChunk(chunkData) - if (processingChannel.isEmpty && !isPlaying && playbackQueue.isEmpty()) { + if (processingChannel.isEmpty && !isPlaying && playbackChannel.isEmpty) { break // Stop the loop if there's no more work to do } } @@ -95,7 +110,7 @@ class ExpoAudioStreamModule : Module() { val audioDataWithoutRIFF = removeRIFFHeaderIfNeeded(decodedBytes) val audioData = convertPCMDataToFloatArray(audioDataWithoutRIFF) - playbackQueue.offer(AudioChunk(audioData, chunkData.promise)) + playbackChannel.send(AudioChunk(audioData, chunkData.promise)) if (!isPlaying) { startPlayback() @@ -132,6 +147,7 @@ class ExpoAudioStreamModule : Module() { audioTrack.play() isPlaying = true startPlaybackLoop() + ensureProcessingLoopStarted() } promise?.resolve(null) } catch (e: Exception) { @@ -145,8 +161,24 @@ class ExpoAudioStreamModule : Module() { audioTrack.flush() isPlaying = false currentPlaybackJob?.cancel() - playbackQueue.clear() - stopProcessingLoop() + currentPlaybackJob = null + + // Resolve promises for any remaining chunks in the playback channel + coroutineScope.launch { + for (chunk in playbackChannel) { + if (!chunk.isPromiseSettled) { + chunk.isPromiseSettled = true + chunk.promise.resolve(null) + } + } + } + + // Cancel the processing job and close the channels + processingJob?.cancel() + processingJob = null + processingChannel.close() + playbackChannel.close() + promise?.resolve(null) } catch (e: Exception) { promise?.reject("ERR_STOP_PLAYBACK", e.message, e) @@ -185,12 +217,12 @@ class ExpoAudioStreamModule : Module() { private fun startPlaybackLoop() { currentPlaybackJob = coroutineScope.launch { - while (isActive && isPlaying) { - val chunk = playbackQueue.poll() - if (chunk != null) { + playbackChannel.consumeAsFlow().collect { chunk -> + if (isPlaying) { playChunk(chunk) } else { - delay(10) + // If not playing, we should resolve the promise to avoid leaks + chunk.promise.resolve(null) } } } @@ -206,7 +238,10 @@ class ExpoAudioStreamModule : Module() { object : AudioTrack.OnPlaybackPositionUpdateListener { override fun onMarkerReached(track: AudioTrack) { audioTrack.setPlaybackPositionUpdateListener(null) - chunk.promise.resolve(null) + if (!chunk.isPromiseSettled) { + chunk.isPromiseSettled = true + chunk.promise.resolve(null) + } continuation.resumeWith(Result.success(Unit)) } @@ -226,16 +261,32 @@ class ExpoAudioStreamModule : Module() { if (written != chunkSize) { audioTrack.setPlaybackPositionUpdateListener(null) val error = Exception("Failed to write entire audio chunk") - chunk.promise.reject("ERR_PLAYBACK", error.message, error) + if (!chunk.isPromiseSettled) { + chunk.isPromiseSettled = true + // chunk.promise.reject("ERR_PLAYBACK", + // error.message, error) + chunk.promise.resolve(null) + } continuation.resumeWith(Result.failure(error)) } continuation.invokeOnCancellation { audioTrack.setPlaybackPositionUpdateListener(null) + if (!chunk.isPromiseSettled) { + chunk.isPromiseSettled = true + chunk.promise.reject( + "ERR_PLAYBACK_CANCELLED", + "Playback was cancelled", + null + ) + } } } } catch (e: Exception) { - chunk.promise.reject("ERR_PLAYBACK", e.message, e) + if (!chunk.isPromiseSettled) { + chunk.isPromiseSettled = true + chunk.promise.reject("ERR_PLAYBACK", e.message, e) + } } } } diff --git a/package.json b/package.json index 24f6401..667eb09 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mykin-ai/expo-audio-stream", - "version": "0.1.20", + "version": "0.1.21", "description": "Expo Audio Stream module", "main": "build/index.js", "types": "build/index.d.ts",