From 37ba080469ebae8c6c6d9c66b04885b560ad5765 Mon Sep 17 00:00:00 2001 From: pax-k Date: Thu, 18 Jul 2024 14:17:52 +0300 Subject: [PATCH] fix: process chunks in order --- .../audiostream/ExpoAudioStreamModule.kt | 285 ++++++++++-------- package.json | 2 +- src/index.ts | 2 +- 3 files changed, 165 insertions(+), 124 deletions(-) diff --git a/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt b/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt index 81cb4d4..f7bf406 100644 --- a/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt +++ b/android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt @@ -12,213 +12,254 @@ import java.nio.ByteOrder import java.util.concurrent.ConcurrentLinkedQueue import kotlin.math.max import kotlin.math.min -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext class ExpoAudioStreamModule : Module() { - data class AudioChunk( - val audioData: FloatArray, - val promise: Promise, - var isSettled: Boolean = false - ) + data class ChunkData(val chunk: String, val promise: Promise) - private val coroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + data class AudioChunk(val audioData: FloatArray, val promise: Promise) + + private val coroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private lateinit var audioTrack: AudioTrack - private val playbackQueue: ConcurrentLinkedQueue = ConcurrentLinkedQueue() - private var playbackJob: Job? = null + private val playbackQueue = ConcurrentLinkedQueue() + private val processingChannel = Channel(Channel.UNLIMITED) + private var processingJob: Job? = null + + private var isPlaying = false + private var currentPlaybackJob: Job? = null override fun definition() = ModuleDefinition { Name("ExpoAudioStream") - OnCreate { - initializeAudioTrack() - startPlaybackLoop() - } + OnCreate { initializeAudioTrack() } OnDestroy { - stopPlaybackLoop() - playbackQueue.clear() - audioTrack.stop() - audioTrack.release() + stopProcessingLoop() + stopPlayback() + coroutineScope.cancel() } AsyncFunction("streamRiff16Khz16BitMonoPcmChunk") { chunk: String, promise: Promise -> - enqueueChunkForPlayback(chunk, promise) + coroutineScope.launch { + processingChannel.send(ChunkData(chunk, promise)) + ensureProcessingLoopStarted() + } } AsyncFunction("setVolume") { volume: Double, promise: Promise -> - setVolume( - volume, - promise - ) + setVolume(volume, promise) } AsyncFunction("pause") { promise: Promise -> pausePlayback(promise) } - AsyncFunction("start") { promise: Promise -> startPlayback(promise) } - AsyncFunction("stop") { promise: Promise -> stopPlayback(promise) } } + private fun ensureProcessingLoopStarted() { + if (processingJob == null || processingJob?.isActive != true) { + startProcessingLoop() + } + } + + private fun startProcessingLoop() { + processingJob = + coroutineScope.launch { + for (chunkData in processingChannel) { + processAndEnqueueChunk(chunkData) + if (processingChannel.isEmpty && !isPlaying && playbackQueue.isEmpty()) { + break // Stop the loop if there's no more work to do + } + } + processingJob = null + } + } + + private fun stopProcessingLoop() { + processingJob?.cancel() + processingJob = null + } + + private suspend fun processAndEnqueueChunk(chunkData: ChunkData) { + try { + val decodedBytes = Base64.decode(chunkData.chunk, Base64.DEFAULT) + val audioDataWithoutRIFF = removeRIFFHeaderIfNeeded(decodedBytes) + val audioData = convertPCMDataToFloatArray(audioDataWithoutRIFF) + + playbackQueue.offer(AudioChunk(audioData, chunkData.promise)) + + if (!isPlaying) { + startPlayback() + } + } catch (e: Exception) { + chunkData.promise.reject("ERR_PROCESSING_AUDIO", e.message, e) + } + } + private fun setVolume(volume: Double, promise: Promise) { val clampedVolume = max(0.0, min(volume, 100.0)) / 100.0 try { - audioTrack.setVolume(clampedVolume.toFloat()) // Set volume method accepts a float value. + audioTrack.setVolume(clampedVolume.toFloat()) promise.resolve(null) } catch (e: Exception) { - promise.reject("ERR_SET_VOLUME", e.toString(), e) + promise.reject("ERR_SET_VOLUME", e.message, e) } } - private fun pausePlayback(promise: Promise) { + private fun pausePlayback(promise: Promise? = null) { try { audioTrack.pause() - promise.resolve(null) + isPlaying = false + currentPlaybackJob?.cancel() + promise?.resolve(null) } catch (e: Exception) { - promise.reject("ERR_PAUSE_PLAYBACK", e.toString(), e) + promise?.reject("ERR_PAUSE_PLAYBACK", e.message, e) } } - private fun startPlayback(promise: Promise) { + private fun startPlayback(promise: Promise? = null) { try { - if (!audioTrack.playState.equals(AudioTrack.PLAYSTATE_PLAYING)) { + if (!isPlaying) { audioTrack.play() + isPlaying = true + startPlaybackLoop() } - promise.resolve(null) + promise?.resolve(null) } catch (e: Exception) { - promise.reject("ERR_START_PLAYBACK", e.toString(), e) + promise?.reject("ERR_START_PLAYBACK", e.message, e) } } - private fun stopPlayback(promise: Promise) { + private fun stopPlayback(promise: Promise? = null) { try { audioTrack.stop() - audioTrack.flush() // Clear the buffer by flushing it. - playbackQueue.clear() // Clear any remaining data in the queue. - promise.resolve(null) + audioTrack.flush() + isPlaying = false + currentPlaybackJob?.cancel() + playbackQueue.clear() + stopProcessingLoop() + promise?.resolve(null) } catch (e: Exception) { - promise.reject("ERR_STOP_PLAYBACK", e.toString(), e) + promise?.reject("ERR_STOP_PLAYBACK", e.message, e) } } private fun initializeAudioTrack() { val audioFormat = - AudioFormat.Builder() - .setSampleRate(16000) - .setEncoding(AudioFormat.ENCODING_PCM_FLOAT) - .setChannelMask(AudioFormat.CHANNEL_OUT_MONO) - .build() + AudioFormat.Builder() + .setSampleRate(16000) + .setEncoding(AudioFormat.ENCODING_PCM_FLOAT) + .setChannelMask(AudioFormat.CHANNEL_OUT_MONO) + .build() val minBufferSize = - AudioTrack.getMinBufferSize( - 16000, - AudioFormat.CHANNEL_OUT_MONO, - AudioFormat.ENCODING_PCM_FLOAT - ) + AudioTrack.getMinBufferSize( + 16000, + AudioFormat.CHANNEL_OUT_MONO, + AudioFormat.ENCODING_PCM_FLOAT + ) audioTrack = - AudioTrack.Builder() - .setAudioAttributes( - AudioAttributes.Builder() - .setUsage(AudioAttributes.USAGE_MEDIA) - .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC) + AudioTrack.Builder() + .setAudioAttributes( + AudioAttributes.Builder() + .setUsage(AudioAttributes.USAGE_MEDIA) + .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC) + .build() + ) + .setAudioFormat(audioFormat) + .setBufferSizeInBytes(minBufferSize * 2) + .setTransferMode(AudioTrack.MODE_STREAM) .build() - ) - .setAudioFormat(audioFormat) - .setBufferSizeInBytes(minBufferSize * 2) - .setTransferMode(AudioTrack.MODE_STREAM) - .build() - - // audioTrack.play() - } - - private fun enqueueChunkForPlayback(chunk: String, promise: Promise) { - coroutineScope.launch { - try { - val decodedBytes = Base64.decode(chunk, Base64.DEFAULT) - val audioDataWithoutRIFF = removeRIFFHeaderIfNeeded(decodedBytes) - val audioData = convertPCMDataToFloatArray(audioDataWithoutRIFF) - playbackQueue.add(AudioChunk(audioData, promise)) - } catch (e: Exception) { - promise.reject("ERR_PROCESSING_AUDIO", e.toString(), e) - } - } } private fun startPlaybackLoop() { - coroutineScope.launch { - while (isActive) { // isActive is now available within CoroutineScope - if (playbackQueue.isNotEmpty()) { - playNextChunk() - } else { - delay(10) // A short delay to prevent busy waiting + currentPlaybackJob = + coroutineScope.launch { + while (isActive && isPlaying) { + val chunk = playbackQueue.poll() + if (chunk != null) { + playChunk(chunk) + } else { + delay(10) + } + } } - } - } } - private fun stopPlaybackLoop() { - playbackJob?.cancel() - audioTrack.stop() - audioTrack.flush() - playbackQueue.forEach { - if (!it.isSettled) it.promise.reject("ERR_STOPPED", "Playback was stopped", null) - } - playbackQueue.clear() - } + private suspend fun playChunk(chunk: AudioChunk) { + withContext(Dispatchers.IO) { + try { + val chunkSize = chunk.audioData.size - private suspend fun playNextChunk() { - val chunk = playbackQueue.poll() - chunk?.let { - setupPlaybackCompletionListener(it) - audioTrack.play() - audioTrack.write(it.audioData, 0, it.audioData.size, AudioTrack.WRITE_BLOCKING) - } - } + suspendCancellableCoroutine { continuation -> + val listener = + object : AudioTrack.OnPlaybackPositionUpdateListener { + override fun onMarkerReached(track: AudioTrack) { + audioTrack.setPlaybackPositionUpdateListener(null) + chunk.promise.resolve(null) + continuation.resumeWith(Result.success(Unit)) + } - private fun setupPlaybackCompletionListener(chunk: AudioChunk) { - audioTrack.setPlaybackPositionUpdateListener(null) // Clear previous listener - audioTrack.setNotificationMarkerPosition( - chunk.audioData.size - ) // Set the marker at the end of the current chunk + override fun onPeriodicNotification(track: AudioTrack) {} + } - audioTrack.setPlaybackPositionUpdateListener( - object : AudioTrack.OnPlaybackPositionUpdateListener { - override fun onMarkerReached(track: AudioTrack?) { - chunk.promise.resolve(null) // Resolve the promise when playback reaches the marker - } + audioTrack.setPlaybackPositionUpdateListener(listener) + audioTrack.setNotificationMarkerPosition(chunkSize) + val written = + audioTrack.write( + chunk.audioData, + 0, + chunkSize, + AudioTrack.WRITE_BLOCKING + ) + + if (written != chunkSize) { + audioTrack.setPlaybackPositionUpdateListener(null) + val error = Exception("Failed to write entire audio chunk") + chunk.promise.reject("ERR_PLAYBACK", error.message, error) + continuation.resumeWith(Result.failure(error)) + } - override fun onPeriodicNotification(track: AudioTrack?) { - // Not used in this implementation + continuation.invokeOnCancellation { + audioTrack.setPlaybackPositionUpdateListener(null) + } } + } catch (e: Exception) { + chunk.promise.reject("ERR_PLAYBACK", e.message, e) } - ) + } } private fun convertPCMDataToFloatArray(pcmData: ByteArray): FloatArray { val shortBuffer = ByteBuffer.wrap(pcmData).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer() val shortArray = ShortArray(shortBuffer.remaining()) shortBuffer.get(shortArray) - return FloatArray(shortArray.size) { index -> - shortArray[index] / 32768.0f // Convert to Float32 - } + return FloatArray(shortArray.size) { index -> shortArray[index] / 32768.0f } } private fun removeRIFFHeaderIfNeeded(audioData: ByteArray): ByteArray { val headerSize = 44 val riffHeader = "RIFF".toByteArray(Charsets.US_ASCII) - // Check if the data is large enough and starts with "RIFF" - if (audioData.size > headerSize && audioData.startsWith(riffHeader)) { - return audioData.copyOfRange(headerSize, audioData.size) + return if (audioData.size > headerSize && audioData.startsWith(riffHeader)) { + audioData.copyOfRange(headerSize, audioData.size) + } else { + audioData } - return audioData } private fun ByteArray.startsWith(prefix: ByteArray): Boolean { if (this.size < prefix.size) return false - for (i in prefix.indices) { - if (this[i] != prefix[i]) return false - } - return true + return prefix.contentEquals(this.sliceArray(prefix.indices)) } } diff --git a/package.json b/package.json index a8bc425..24f6401 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mykin-ai/expo-audio-stream", - "version": "0.1.19", + "version": "0.1.20", "description": "Expo Audio Stream module", "main": "build/index.js", "types": "build/index.d.ts", diff --git a/src/index.ts b/src/index.ts index c01de7d..42fa4fb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,7 +5,7 @@ export class ExpoAudioStream { base64Chunk: string ): Promise { try { - return await ExpoAudioStreamModule.streamRiff16Khz16BitMonoPcmChunk( + return ExpoAudioStreamModule.streamRiff16Khz16BitMonoPcmChunk( base64Chunk ); } catch (error) {