Skip to content

Commit

Permalink
fix: process chunks in order
Browse files Browse the repository at this point in the history
  • Loading branch information
pax-k committed Jul 18, 2024
1 parent 86f0c59 commit 37ba080
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 124 deletions.
285 changes: 163 additions & 122 deletions android/src/main/java/expo/modules/audiostream/ExpoAudioStreamModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,213 +12,254 @@ import java.nio.ByteOrder
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.math.max
import kotlin.math.min
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext

class ExpoAudioStreamModule : Module() {
data class AudioChunk(
val audioData: FloatArray,
val promise: Promise,
var isSettled: Boolean = false
)
data class ChunkData(val chunk: String, val promise: Promise)

private val coroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
data class AudioChunk(val audioData: FloatArray, val promise: Promise)

private val coroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private lateinit var audioTrack: AudioTrack
private val playbackQueue: ConcurrentLinkedQueue<AudioChunk> = ConcurrentLinkedQueue()
private var playbackJob: Job? = null
private val playbackQueue = ConcurrentLinkedQueue<AudioChunk>()
private val processingChannel = Channel<ChunkData>(Channel.UNLIMITED)

This comment has been minimized.

Copy link
@Volland

Volland Jul 18, 2024

it is dangerous to use unlimited channel . it is out of memory candidate

private var processingJob: Job? = null

private var isPlaying = false
private var currentPlaybackJob: Job? = null

override fun definition() = ModuleDefinition {
Name("ExpoAudioStream")

OnCreate {
initializeAudioTrack()
startPlaybackLoop()
}
OnCreate { initializeAudioTrack() }

OnDestroy {
stopPlaybackLoop()
playbackQueue.clear()
audioTrack.stop()
audioTrack.release()
stopProcessingLoop()
stopPlayback()
coroutineScope.cancel()
}

AsyncFunction("streamRiff16Khz16BitMonoPcmChunk") { chunk: String, promise: Promise ->
enqueueChunkForPlayback(chunk, promise)
coroutineScope.launch {
processingChannel.send(ChunkData(chunk, promise))
ensureProcessingLoopStarted()
}
}

AsyncFunction("setVolume") { volume: Double, promise: Promise ->
setVolume(
volume,
promise
)
setVolume(volume, promise)
}

AsyncFunction("pause") { promise: Promise -> pausePlayback(promise) }

AsyncFunction("start") { promise: Promise -> startPlayback(promise) }

AsyncFunction("stop") { promise: Promise -> stopPlayback(promise) }
}

private fun ensureProcessingLoopStarted() {
if (processingJob == null || processingJob?.isActive != true) {
startProcessingLoop()
}
}

private fun startProcessingLoop() {
processingJob =
coroutineScope.launch {
for (chunkData in processingChannel) {
processAndEnqueueChunk(chunkData)
if (processingChannel.isEmpty && !isPlaying && playbackQueue.isEmpty()) {
break // Stop the loop if there's no more work to do
}
}
processingJob = null
}
}

private fun stopProcessingLoop() {
processingJob?.cancel()
processingJob = null
}

private suspend fun processAndEnqueueChunk(chunkData: ChunkData) {
try {
val decodedBytes = Base64.decode(chunkData.chunk, Base64.DEFAULT)
val audioDataWithoutRIFF = removeRIFFHeaderIfNeeded(decodedBytes)
val audioData = convertPCMDataToFloatArray(audioDataWithoutRIFF)

playbackQueue.offer(AudioChunk(audioData, chunkData.promise))

if (!isPlaying) {
startPlayback()
}
} catch (e: Exception) {
chunkData.promise.reject("ERR_PROCESSING_AUDIO", e.message, e)
}
}

private fun setVolume(volume: Double, promise: Promise) {
val clampedVolume = max(0.0, min(volume, 100.0)) / 100.0
try {
audioTrack.setVolume(clampedVolume.toFloat()) // Set volume method accepts a float value.
audioTrack.setVolume(clampedVolume.toFloat())
promise.resolve(null)
} catch (e: Exception) {
promise.reject("ERR_SET_VOLUME", e.toString(), e)
promise.reject("ERR_SET_VOLUME", e.message, e)
}
}

private fun pausePlayback(promise: Promise) {
private fun pausePlayback(promise: Promise? = null) {
try {
audioTrack.pause()
promise.resolve(null)
isPlaying = false
currentPlaybackJob?.cancel()
promise?.resolve(null)
} catch (e: Exception) {
promise.reject("ERR_PAUSE_PLAYBACK", e.toString(), e)
promise?.reject("ERR_PAUSE_PLAYBACK", e.message, e)
}
}

private fun startPlayback(promise: Promise) {
private fun startPlayback(promise: Promise? = null) {
try {
if (!audioTrack.playState.equals(AudioTrack.PLAYSTATE_PLAYING)) {
if (!isPlaying) {
audioTrack.play()
isPlaying = true
startPlaybackLoop()
}
promise.resolve(null)
promise?.resolve(null)
} catch (e: Exception) {
promise.reject("ERR_START_PLAYBACK", e.toString(), e)
promise?.reject("ERR_START_PLAYBACK", e.message, e)
}
}

private fun stopPlayback(promise: Promise) {
private fun stopPlayback(promise: Promise? = null) {
try {
audioTrack.stop()
audioTrack.flush() // Clear the buffer by flushing it.
playbackQueue.clear() // Clear any remaining data in the queue.
promise.resolve(null)
audioTrack.flush()
isPlaying = false
currentPlaybackJob?.cancel()
playbackQueue.clear()
stopProcessingLoop()
promise?.resolve(null)
} catch (e: Exception) {
promise.reject("ERR_STOP_PLAYBACK", e.toString(), e)
promise?.reject("ERR_STOP_PLAYBACK", e.message, e)
}
}

private fun initializeAudioTrack() {
val audioFormat =
AudioFormat.Builder()
.setSampleRate(16000)
.setEncoding(AudioFormat.ENCODING_PCM_FLOAT)
.setChannelMask(AudioFormat.CHANNEL_OUT_MONO)
.build()
AudioFormat.Builder()
.setSampleRate(16000)
.setEncoding(AudioFormat.ENCODING_PCM_FLOAT)
.setChannelMask(AudioFormat.CHANNEL_OUT_MONO)
.build()

val minBufferSize =
AudioTrack.getMinBufferSize(
16000,
AudioFormat.CHANNEL_OUT_MONO,
AudioFormat.ENCODING_PCM_FLOAT
)
AudioTrack.getMinBufferSize(
16000,
AudioFormat.CHANNEL_OUT_MONO,
AudioFormat.ENCODING_PCM_FLOAT
)

audioTrack =
AudioTrack.Builder()
.setAudioAttributes(
AudioAttributes.Builder()
.setUsage(AudioAttributes.USAGE_MEDIA)
.setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
AudioTrack.Builder()
.setAudioAttributes(
AudioAttributes.Builder()
.setUsage(AudioAttributes.USAGE_MEDIA)
.setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
.build()
)
.setAudioFormat(audioFormat)
.setBufferSizeInBytes(minBufferSize * 2)
.setTransferMode(AudioTrack.MODE_STREAM)
.build()
)
.setAudioFormat(audioFormat)
.setBufferSizeInBytes(minBufferSize * 2)
.setTransferMode(AudioTrack.MODE_STREAM)
.build()

// audioTrack.play()
}

private fun enqueueChunkForPlayback(chunk: String, promise: Promise) {
coroutineScope.launch {
try {
val decodedBytes = Base64.decode(chunk, Base64.DEFAULT)
val audioDataWithoutRIFF = removeRIFFHeaderIfNeeded(decodedBytes)
val audioData = convertPCMDataToFloatArray(audioDataWithoutRIFF)
playbackQueue.add(AudioChunk(audioData, promise))
} catch (e: Exception) {
promise.reject("ERR_PROCESSING_AUDIO", e.toString(), e)
}
}
}

private fun startPlaybackLoop() {
coroutineScope.launch {
while (isActive) { // isActive is now available within CoroutineScope
if (playbackQueue.isNotEmpty()) {
playNextChunk()
} else {
delay(10) // A short delay to prevent busy waiting
currentPlaybackJob =
coroutineScope.launch {
while (isActive && isPlaying) {

This comment has been minimized.

Copy link
@Volland

Volland Jul 18, 2024

this loop could drain battery and create a unnesesary CPU load . could we use events or more reactive patter here

val chunk = playbackQueue.poll()
if (chunk != null) {
playChunk(chunk)
} else {
delay(10)
}
}
}
}
}
}

private fun stopPlaybackLoop() {
playbackJob?.cancel()
audioTrack.stop()
audioTrack.flush()
playbackQueue.forEach {
if (!it.isSettled) it.promise.reject("ERR_STOPPED", "Playback was stopped", null)
}
playbackQueue.clear()
}
private suspend fun playChunk(chunk: AudioChunk) {
withContext(Dispatchers.IO) {
try {
val chunkSize = chunk.audioData.size

private suspend fun playNextChunk() {
val chunk = playbackQueue.poll()
chunk?.let {
setupPlaybackCompletionListener(it)
audioTrack.play()
audioTrack.write(it.audioData, 0, it.audioData.size, AudioTrack.WRITE_BLOCKING)
}
}
suspendCancellableCoroutine { continuation ->
val listener =
object : AudioTrack.OnPlaybackPositionUpdateListener {
override fun onMarkerReached(track: AudioTrack) {
audioTrack.setPlaybackPositionUpdateListener(null)
chunk.promise.resolve(null)
continuation.resumeWith(Result.success(Unit))
}

private fun setupPlaybackCompletionListener(chunk: AudioChunk) {
audioTrack.setPlaybackPositionUpdateListener(null) // Clear previous listener
audioTrack.setNotificationMarkerPosition(
chunk.audioData.size
) // Set the marker at the end of the current chunk
override fun onPeriodicNotification(track: AudioTrack) {}
}

audioTrack.setPlaybackPositionUpdateListener(
object : AudioTrack.OnPlaybackPositionUpdateListener {
override fun onMarkerReached(track: AudioTrack?) {
chunk.promise.resolve(null) // Resolve the promise when playback reaches the marker
}
audioTrack.setPlaybackPositionUpdateListener(listener)
audioTrack.setNotificationMarkerPosition(chunkSize)
val written =
audioTrack.write(
chunk.audioData,
0,
chunkSize,
AudioTrack.WRITE_BLOCKING
)

if (written != chunkSize) {
audioTrack.setPlaybackPositionUpdateListener(null)
val error = Exception("Failed to write entire audio chunk")
chunk.promise.reject("ERR_PLAYBACK", error.message, error)
continuation.resumeWith(Result.failure(error))
}

override fun onPeriodicNotification(track: AudioTrack?) {
// Not used in this implementation
continuation.invokeOnCancellation {
audioTrack.setPlaybackPositionUpdateListener(null)
}
}
} catch (e: Exception) {
chunk.promise.reject("ERR_PLAYBACK", e.message, e)
}
)
}
}

private fun convertPCMDataToFloatArray(pcmData: ByteArray): FloatArray {
val shortBuffer = ByteBuffer.wrap(pcmData).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer()
val shortArray = ShortArray(shortBuffer.remaining())
shortBuffer.get(shortArray)
return FloatArray(shortArray.size) { index ->
shortArray[index] / 32768.0f // Convert to Float32
}
return FloatArray(shortArray.size) { index -> shortArray[index] / 32768.0f }
}

private fun removeRIFFHeaderIfNeeded(audioData: ByteArray): ByteArray {
val headerSize = 44
val riffHeader = "RIFF".toByteArray(Charsets.US_ASCII)

// Check if the data is large enough and starts with "RIFF"
if (audioData.size > headerSize && audioData.startsWith(riffHeader)) {
return audioData.copyOfRange(headerSize, audioData.size)
return if (audioData.size > headerSize && audioData.startsWith(riffHeader)) {
audioData.copyOfRange(headerSize, audioData.size)
} else {
audioData
}
return audioData
}

private fun ByteArray.startsWith(prefix: ByteArray): Boolean {
if (this.size < prefix.size) return false
for (i in prefix.indices) {
if (this[i] != prefix[i]) return false
}
return true
return prefix.contentEquals(this.sliceArray(prefix.indices))
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mykin-ai/expo-audio-stream",
"version": "0.1.19",
"version": "0.1.20",
"description": "Expo Audio Stream module",
"main": "build/index.js",
"types": "build/index.d.ts",
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class ExpoAudioStream {
base64Chunk: string
): Promise<void> {
try {
return await ExpoAudioStreamModule.streamRiff16Khz16BitMonoPcmChunk(
return ExpoAudioStreamModule.streamRiff16Khz16BitMonoPcmChunk(
base64Chunk
);
} catch (error) {
Expand Down

0 comments on commit 37ba080

Please sign in to comment.