diff --git a/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt b/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt index f2dc4f0c..fe667eec 100644 --- a/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt +++ b/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt @@ -1,5 +1,6 @@ package com.otaliastudios.transcoder.integration +import android.media.MediaFormat import android.media.MediaMetadataRetriever.METADATA_KEY_DURATION import android.media.MediaMetadataRetriever import androidx.test.ext.junit.runners.AndroidJUnit4 @@ -66,7 +67,7 @@ class IssuesTests { } - @Test(timeout = 3000) + @Test(timeout = 5000) fun issue137() = with(Helper(137)) { transcode { // addDataSource(ClipDataSource(input("main.mp3"), 0L, 200_000L)) @@ -93,7 +94,7 @@ class IssuesTests { Unit } - @Test(timeout = 3000) + @Test(timeout = 5000) fun issue184() = with(Helper(184)) { transcode { addDataSource(TrackType.VIDEO, input("transcode.3gp")) diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt index 9a138acb..fe1c9d01 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt @@ -1,16 +1,19 @@ package com.otaliastudios.transcoder.internal import android.media.MediaCodec +import android.media.MediaCodecInfo +import android.media.MediaCodecList import android.media.MediaFormat +import android.opengl.EGL14 import android.view.Surface +import com.otaliastudios.opengl.core.EglCore +import com.otaliastudios.opengl.surface.EglOffscreenSurface +import com.otaliastudios.opengl.surface.EglWindowSurface import com.otaliastudios.transcoder.common.TrackStatus import com.otaliastudios.transcoder.common.TrackType -import com.otaliastudios.transcoder.internal.media.MediaFormatProvider +import com.otaliastudios.transcoder.internal.media.MediaFormatConstants import com.otaliastudios.transcoder.internal.utils.Logger import com.otaliastudios.transcoder.internal.utils.TrackMap -import com.otaliastudios.transcoder.internal.utils.trackMapOf -import com.otaliastudios.transcoder.source.DataSource -import com.otaliastudios.transcoder.strategy.TrackStrategy /** * Encoders are shared between segments. This is not strictly needed but it is more efficient @@ -25,6 +28,16 @@ internal class Codecs( private val current: TrackMap ) { + internal class Surface( + val context: EglCore, + val window: EglWindowSurface, + ) { + fun release() { + window.release() + context.release() + } + } + private val log = Logger("Codecs") val encoders = object : TrackMap> { @@ -40,9 +53,28 @@ internal class Codecs( private val lazyVideo by lazy { val format = tracks.outputFormats.video + val width = format.getInteger(MediaFormat.KEY_WIDTH) + val height = format.getInteger(MediaFormat.KEY_HEIGHT) + log.i("Destination video surface size: ${width}x${height} @ ${format.getInteger(MediaFormatConstants.KEY_ROTATION_DEGREES)}") + log.i("Destination video format: $format") + + val allCodecs = MediaCodecList(MediaCodecList.REGULAR_CODECS) + val videoEncoders = allCodecs.codecInfos.filter { it.isEncoder && it.supportedTypes.any { it.startsWith("video/") } } + log.i("Available encoders: ${videoEncoders.joinToString { "${it.name} (${it.supportedTypes.joinToString()})" }}") + + // Could consider MediaCodecList(MediaCodecList.REGULAR_CODECS).findEncoderForFormat(format) + // But it's trickier, for example, format should not include frame rate on API 21 and maybe other quirks. val codec = MediaCodec.createEncoderByType(format.getString(MediaFormat.KEY_MIME)!!) codec.configure(format, null, null, MediaCodec.CONFIGURE_FLAG_ENCODE) - codec to codec.createInputSurface() + log.i("Selected encoder ${codec.name}") + val surface = codec.createInputSurface() + + log.i("Creating OpenGL context on ${Thread.currentThread()} (${surface.isValid})") + val eglContext = EglCore(EGL14.EGL_NO_CONTEXT, EglCore.FLAG_RECORDABLE) + val eglWindow = EglWindowSurface(eglContext, surface, true) + eglWindow.makeCurrent() + + codec to Surface(eglContext, eglWindow) } override fun get(type: TrackType) = when (type) { diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt index e1b0f761..74adba3e 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt @@ -3,7 +3,6 @@ package com.otaliastudios.transcoder.internal import com.otaliastudios.transcoder.common.TrackType import com.otaliastudios.transcoder.internal.pipeline.Pipeline import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger internal class Segment( val type: TrackType, @@ -27,7 +26,7 @@ internal class Segment( fun needsSleep(): Boolean { when(val s = state ?: return false) { is State.Ok -> return false - is State.Wait -> return s.sleep + is State.Failure -> return s.sleep } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt index bdcc31a1..9c73ea01 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt @@ -21,7 +21,7 @@ internal class Segments( fun hasNext(type: TrackType): Boolean { if (!sources.has(type)) return false - log.v("hasNext($type): segment=${current.getOrNull(type)} lastIndex=${sources.getOrNull(type)?.lastIndex} canAdvance=${current.getOrNull(type)?.canAdvance()}") + // log.v("hasNext($type): segment=${current.getOrNull(type)} lastIndex=${sources.getOrNull(type)?.lastIndex} canAdvance=${current.getOrNull(type)?.canAdvance()}") val segment = current.getOrNull(type) ?: return true // not started val lastIndex = sources.getOrNull(type)?.lastIndex ?: return false // no track! return segment.canAdvance() || segment.index < lastIndex diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt index a080d50b..a8321c90 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt @@ -6,11 +6,8 @@ import android.view.Surface import com.otaliastudios.transcoder.internal.audio.remix.AudioRemixer import com.otaliastudios.transcoder.internal.codec.* import com.otaliastudios.transcoder.internal.pipeline.* -import com.otaliastudios.transcoder.internal.utils.Logger -import com.otaliastudios.transcoder.internal.utils.trackMapOf import com.otaliastudios.transcoder.resample.AudioResampler import com.otaliastudios.transcoder.stretch.AudioStretcher -import java.util.concurrent.atomic.AtomicInteger import kotlin.math.ceil import kotlin.math.floor @@ -30,18 +27,18 @@ internal class AudioEngine( private val MediaFormat.sampleRate get() = getInteger(KEY_SAMPLE_RATE) private val MediaFormat.channels get() = getInteger(KEY_CHANNEL_COUNT) + private val chunks = ChunkQueue(log) + private var readyToDrain = false private lateinit var rawFormat: MediaFormat - private lateinit var chunks: ChunkQueue private lateinit var remixer: AudioRemixer override fun handleSourceFormat(sourceFormat: MediaFormat): Surface? = null override fun handleRawFormat(rawFormat: MediaFormat) { log.i("handleRawFormat($rawFormat)") - check(!::rawFormat.isInitialized) { "handleRawFormat called twice: ${this.rawFormat} => $rawFormat"} this.rawFormat = rawFormat - remixer = AudioRemixer[rawFormat.channels, targetFormat.channels] - chunks = ChunkQueue(log, rawFormat.sampleRate, rawFormat.channels) + this.remixer = AudioRemixer[rawFormat.channels, targetFormat.channels] + this.readyToDrain = true } override fun enqueueEos(data: DecoderData) { @@ -59,19 +56,24 @@ internal class AudioEngine( } override fun drain(): State { + if (!readyToDrain) { + log.i("drain(): not ready, waiting...") + return State.Retry(false) + } if (chunks.isEmpty()) { // nothing was enqueued log.i("drain(): no chunks, waiting...") - return State.Wait(false) + return State.Retry(false) } val (outBytes, outId) = next.buffer() ?: return run { // dequeueInputBuffer failed log.i("drain(): no next buffer, waiting...") - State.Wait(true) + State.Retry(true) } val outBuffer = outBytes.asShortBuffer() return chunks.drain( - eos = State.Eos(EncoderData(outBytes, outId, 0)) + eos = State.Eos(EncoderData(outBytes, outId, 0)), + format = rawFormat ) { inBuffer, timeUs, stretch -> val outSize = outBuffer.remaining() val inSize = inBuffer.remaining() diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt index 2990ff3d..5ad3ba2e 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt @@ -1,6 +1,10 @@ package com.otaliastudios.transcoder.internal.audio +import android.media.MediaFormat +import android.media.MediaFormat.KEY_CHANNEL_COUNT +import android.media.MediaFormat.KEY_SAMPLE_RATE import com.otaliastudios.transcoder.internal.utils.Logger +import java.nio.ByteBuffer import java.nio.ShortBuffer private data class Chunk( @@ -20,12 +24,9 @@ private data class Chunk( * big enough to contain the full processed size, in which case we want to consume only * part of the input buffer and keep it available for the next cycle. */ -internal class ChunkQueue( - private val log: Logger, - private val sampleRate: Int, - private val channels: Int -) { +internal class ChunkQueue(private val log: Logger) { private val queue = ArrayDeque() + private val pool = ShortBufferPool() fun isEmpty() = queue.isEmpty() @@ -44,7 +45,7 @@ internal class ChunkQueue( queue.addLast(Chunk.Eos) } - fun drain(eos: T, action: (buffer: ShortBuffer, timeUs: Long, timeStretch: Double) -> T): T { + fun drain(format: MediaFormat, eos: T, action: (buffer: ShortBuffer, timeUs: Long, timeStretch: Double) -> T): T { val head = queue.removeFirst() if (head === Chunk.Eos) return eos @@ -54,9 +55,17 @@ internal class ChunkQueue( // Action can reduce the limit for any reason. Restore it before comparing sizes. head.buffer.limit(limit) if (head.buffer.hasRemaining()) { + // We could technically hold onto the same chunk, but in practice it's better to + // release input buffers back to the decoder otherwise it can get stuck val consumed = size - head.buffer.remaining() + val sampleRate = format.getInteger(KEY_SAMPLE_RATE) + val channelCount = format.getInteger(KEY_CHANNEL_COUNT) + val buffer = pool.take(head.buffer) + head.release() queue.addFirst(head.copy( - timeUs = shortsToUs(consumed, sampleRate, channels) + timeUs = shortsToUs(consumed, sampleRate, channelCount), + release = { pool.give(buffer) }, + buffer = buffer )) log.v("[ChunkQueue] partially handled chunk at ${head.timeUs}us, ${head.buffer.remaining()} bytes left (${queue.size})") } else { @@ -67,3 +76,27 @@ internal class ChunkQueue( return result } } + + +class ShortBufferPool { + private val pool = mutableListOf() + + fun take(original: ShortBuffer): ShortBuffer { + val needed = original.remaining() + val index = pool.indexOfFirst { it.capacity() >= needed } + val memory = when { + index >= 0 -> pool.removeAt(index) + else -> ByteBuffer.allocateDirect(needed.coerceAtLeast(1024)) + .order(original.order()) + .asShortBuffer() + } + memory.put(original) + memory.flip() + return memory + } + + fun give(buffer: ShortBuffer) { + buffer.clear() + pool.add(buffer) + } +} \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt index 44e5d052..9ce6778c 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt @@ -10,11 +10,7 @@ import com.otaliastudios.transcoder.internal.data.ReaderData import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.QueuedStep import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger -import com.otaliastudios.transcoder.internal.utils.trackMapOf import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicInteger -import kotlin.properties.Delegates import kotlin.properties.Delegates.observable @@ -70,7 +66,7 @@ internal class Decoder( val buf = checkNotNull(codec.getInputBuffer(id)) { "inputBuffer($id) should not be null." } buf to id } else { - log.i("buffer() failed. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") + log.i("buffer() failed with $id. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") null } } @@ -96,7 +92,7 @@ internal class Decoder( return when (result) { INFO_TRY_AGAIN_LATER -> { log.i("drain(): got INFO_TRY_AGAIN_LATER, waiting.") - State.Wait(true) + State.Retry(true) } INFO_OUTPUT_FORMAT_CHANGED -> { log.i("drain(): got INFO_OUTPUT_FORMAT_CHANGED, handling format and retrying. format=${codec.outputFormat}") @@ -126,7 +122,7 @@ internal class Decoder( } else { // frame was dropped, no need to sleep codec.releaseOutputBuffer(result, false) - State.Wait(false) + State.Retry(false) }.also { log.v("drain(): returning $it") } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt index 4cce0d87..bdfabf50 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt @@ -3,19 +3,15 @@ package com.otaliastudios.transcoder.internal.codec import android.media.MediaCodec import android.media.MediaCodec.* import android.view.Surface +import com.otaliastudios.opengl.surface.EglWindowSurface import com.otaliastudios.transcoder.common.TrackType -import com.otaliastudios.transcoder.common.trackType import com.otaliastudios.transcoder.internal.Codecs import com.otaliastudios.transcoder.internal.data.WriterChannel import com.otaliastudios.transcoder.internal.data.WriterData import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.QueuedStep import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger -import com.otaliastudios.transcoder.internal.utils.trackMapOf import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicInteger -import kotlin.properties.Delegates import kotlin.properties.Delegates.observable internal data class EncoderData( @@ -27,15 +23,15 @@ internal data class EncoderData( } internal interface EncoderChannel : Channel { - val surface: Surface? + val surface: Codecs.Surface? fun buffer(): Pair? } internal class Encoder( - private val codec: MediaCodec, - override val surface: Surface?, - ownsCodecStart: Boolean, - private val ownsCodecStop: Boolean, + private val codec: MediaCodec, + override val surface: Codecs.Surface?, + ownsCodecStart: Boolean, + private val ownsCodecStop: Boolean, ) : QueuedStep( when (surface) { null -> "AudioEncoder" @@ -44,13 +40,12 @@ internal class Encoder( ), EncoderChannel { constructor(codecs: Codecs, type: TrackType) : this( - codecs.encoders[type].first, - codecs.encoders[type].second, - codecs.ownsEncoderStart[type], - codecs.ownsEncoderStop[type] + codecs.encoders[type].first, + codecs.encoders[type].second, + codecs.ownsEncoderStart[type], + codecs.ownsEncoderStop[type] ) - private val type = if (surface != null) TrackType.VIDEO else TrackType.AUDIO private var dequeuedInputs by observable(0) { _, _, _ -> printDequeued() } private var dequeuedOutputs by observable(0) { _, _, _ -> printDequeued() } private fun printDequeued() { @@ -61,9 +56,8 @@ internal class Encoder( private var info = BufferInfo() - init { - log.i("Encoder: ownsStart=$ownsCodecStart ownsStop=$ownsCodecStop") + log.i("ownsStart=$ownsCodecStart ownsStop=$ownsCodecStop") if (ownsCodecStart) { codec.start() } @@ -116,7 +110,7 @@ internal class Encoder( State.Eos(WriterData(buffer, 0L, 0) {}) } else { log.i("Can't dequeue output buffer: INFO_TRY_AGAIN_LATER") - State.Wait(true) + State.Retry(true) } } INFO_OUTPUT_FORMAT_CHANGED -> { diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt index 8da1f21e..c663f6b7 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt @@ -4,7 +4,6 @@ import com.otaliastudios.transcoder.common.TrackType import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger import com.otaliastudios.transcoder.source.DataSource import java.nio.ByteBuffer @@ -28,7 +27,7 @@ internal class Reader( if (buffer == null) { // dequeueInputBuffer failed log.v("Returning State.Wait because buffer is null.") - return State.Wait(true) + return State.Retry(true) } else { return action(buffer.first, buffer.second) } @@ -46,7 +45,7 @@ internal class Reader( } } else if (!source.canReadTrack(track)) { log.i("Returning State.Wait because source can't read $track right now.") - State.Wait(false) + State.Retry(false) } else { nextBufferOrWait { byteBuffer, id -> chunk.buffer = byteBuffer diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt index d24f997b..0dea0917 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt @@ -6,13 +6,60 @@ import com.otaliastudios.transcoder.internal.utils.Logger private class PipelineItem( val step: Step, + val name: String, ) { - var success: State.Ok? = null - var failure: State.Wait? = null + // var success: State.Ok? = null + // var failure: State.Retry? = null + val unhandled = ArrayDeque>() + var done = false + var advanced = false + var packets = 0 + private var nextUnhandled: ArrayDeque>? = null - fun set(output: State?) { - success = output as? State.Ok - failure = output as? State.Wait + fun attachToNext(next: PipelineItem) { + nextUnhandled = next.unhandled + step.initialize(next = next.step.channel) + } + + fun canHandle(first: Boolean): Boolean { + if (done) return false + if (first) { + unhandled.clear() + unhandled.addLast(State.Ok(Unit)) + } + return unhandled.isNotEmpty() || step is QueuedStep + } + + fun handle(): State.Failure? { + advanced = false + while (unhandled.isNotEmpty() && !done) { + val input = unhandled.removeFirst() + when (val result = step.advance(input)) { + is State.Ok -> { + packets++ + advanced = true + done = result is State.Eos + nextUnhandled?.addLast(result) + } + is State.Retry -> { + unhandled.addFirst(input) + return result + } + is State.Consume -> return result + } + } + if (!advanced && !done && step is QueuedStep) { + when (val result = step.tryAdvance()) { + is State.Ok -> { + packets++ + advanced = true + done = result is State.Eos + nextUnhandled?.addLast(result) + } + is State.Failure -> return result + } + } + return null } } @@ -21,13 +68,40 @@ internal class Pipeline private constructor(name: String, private val items: Lis private val log = Logger("${name}Pipeline") init { - items.zipWithNext().reversed().forEach { (first, next) -> - first.step.initialize(next = next.step.channel) - } + items.zipWithNext().reversed().forEach { (first, next) -> first.attachToNext(next) } } - // Returns Eos, Ok or Wait fun execute(): State { + log.v("LOOP") + var advanced = false + var sleeps = false + + for (i in items.indices) { + val item = items[i] + + if (item.canHandle(i == 0)) { + log.v("${item.name} START ${item.unhandled.size}") + val failure = item.handle() + if (failure != null) { + sleeps = sleeps || failure.sleep + log.v("${item.name} FAILED +${item.packets}") + } else { + log.v("${item.name} SUCCESS +${item.packets} ${if (item.done) "(eos)" else ""}") + } + advanced = advanced || item.advanced + } else { + log.v("${item.name} SKIP +${item.packets} ${if (item.done) "(eos)" else ""}") + } + } + return when { + items.isEmpty() -> State.Eos(Unit) + items.last().done -> State.Eos(Unit) + advanced -> State.Ok(Unit) + else -> State.Retry(sleeps) + } + } + + /*fun execute_OLD(): State { var headState: State.Ok = State.Ok(Unit) var headFresh = true // In case of failure in the previous run, we should re-run all items before the failed one @@ -37,21 +111,25 @@ internal class Pipeline private constructor(name: String, private val items: Lis for (i in items.indices) { val item = items[i] val cached = item.success - if (cached != null && (!headFresh || cached is State.Eos)) { - log.v("${i+1}/${items.size} '${item.step.name}' SKIP ${if (item.success is State.Eos) "(eos)" else "(handled)"}") - headState = cached + val skip = cached is State.Eos || (!headFresh && cached != null) + if (skip) { + // Note: we could consider a retry() on queued steps here but it's risky + // because the current 'cached' value may have never been handled by the next item + log.v("${i+1}/${items.size} '${item.step.name}' SKIP ${if (cached is State.Eos) "(eos)" else "(handled)"}") + headState = cached!! headFresh = false continue } - // This item either: - // - never run (cached == null, fresh = true) - // - caused failure in the previous run (i == previouslyFailedIndex, failure != null) - // - run (with failure or success) then one of the items following it failed (i < previouslyFailedIndex, cached != null) + // This item did not succeed at the last loop, or we have fresh input data. log.v("${i+1}/${items.size} '${item.step.name}' START (${if (headFresh) "fresh" else "stale"})") - // item.set(item.step.step(headState, headFresh)) - if (item.success != null) { - log.v("${i+1}/${items.size} '${item.step.name}' SUCCESS ${if (item.success is State.Eos) "(eos)" else ""}") - headState = item.success!! + val result = when { + !headFresh && item.step is QueuedStep -> item.step.retry() // queued steps should never get stale data + else -> item.step.advance(headState) + } + item.set(result) + if (result is State.Ok) { + log.v("${i+1}/${items.size} '${item.step.name}' SUCCESS ${if (result is State.Eos) "(eos)" else ""}") + headState = result headFresh = true if (i == items.lastIndex) items.forEach { if (it.success !is State.Eos) it.set(null) @@ -80,7 +158,7 @@ internal class Pipeline private constructor(name: String, private val items: Lis headState is State.Eos -> State.Eos(Unit) else -> State.Ok(Unit) } - } + } */ fun release() { items.forEach { it.step.release() } @@ -88,9 +166,13 @@ internal class Pipeline private constructor(name: String, private val items: Lis companion object { internal fun build(name: String, builder: () -> Builder<*, Channel> = { Builder() }): Pipeline { - val items = builder().steps.map { + val steps = builder().steps + val items = steps.mapIndexed { index, step -> @Suppress("UNCHECKED_CAST") - PipelineItem(it as Step) + PipelineItem( + step = step as Step, + name = "${index+1}/${steps.size} '${step.name}'" + ) } return Pipeline(name, items) } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt index acf6247b..2ffc6c51 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt @@ -1,9 +1,9 @@ package com.otaliastudios.transcoder.internal.pipeline -internal sealed class State { +internal sealed interface State { // Running - open class Ok(val value: T) : State() { + open class Ok(val value: T) : State { override fun toString() = "State.Ok($value)" } @@ -12,8 +12,16 @@ internal sealed class State { override fun toString() = "State.Eos($value)" } - // couldn't run, but might in the future - class Wait(val sleep: Boolean) : State() { - override fun toString() = "State.Wait($sleep)" + // Failed to produce output, try again later + sealed interface Failure : State { + val sleep: Boolean + } + + class Retry(override val sleep: Boolean) : Failure { + override fun toString() = "State.Retry($sleep)" + } + + class Consume(override val sleep: Boolean = false) : Failure { + override fun toString() = "State.Consume($sleep)" } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt index 58a2760a..8b39aa7f 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt @@ -43,10 +43,15 @@ internal abstract class QueuedStep< final override fun advance(state: State.Ok): State { if (state is State.Eos) enqueueEos(state.value) else enqueue(state.value) - return drain() + // Disallow State.Retry because the input was already handled. + return when (val result = drain()) { + is State.Retry -> State.Consume(result.sleep) + else -> result + } } - fun retry(): State { + fun tryAdvance(): State { return drain() } + } \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt index 03b8776a..a7773a22 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt @@ -15,27 +15,19 @@ internal class VideoPublisher: BaseStep): State { if (state is State.Eos) { return State.Eos(EncoderData.Empty) } else { - surface.setPresentationTime(state.value * 1000) - surface.swapBuffers() + val surface = next.surface!! + surface.window.setPresentationTime(state.value * 1000) + surface.window.swapBuffers() + /* val s = EGL14.eglGetCurrentSurface(EGL14.EGL_DRAW) + val ss = IntArray(2) + EGL14.eglQuerySurface(EGL14.eglGetCurrentDisplay(), s, EGL14.EGL_WIDTH, ss, 0) + EGL14.eglQuerySurface(EGL14.eglGetCurrentDisplay(), s, EGL14.EGL_HEIGHT, ss, 1) + log.e("XXX VideoPublisher.surfaceSize: ${ss[0]}x${ss[1]}") */ return State.Ok(EncoderData.Empty) } } - - override fun release() { - surface.release() - core.release() - } } \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt index 056b7daf..237eb4f6 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt @@ -9,8 +9,6 @@ import com.otaliastudios.transcoder.internal.media.MediaFormatConstants.KEY_ROTA import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.pipeline.Step -import com.otaliastudios.transcoder.internal.utils.Logger internal class VideoRenderer( @@ -39,14 +37,17 @@ internal class VideoRenderer( val width = targetFormat.getInteger(KEY_WIDTH) val height = targetFormat.getInteger(KEY_HEIGHT) val flip = extraRotation % 180 != 0 - log.e("FrameDrawerEncoder: size=$width-$height, flipping=$flip") - targetFormat.setInteger(KEY_WIDTH, if (flip) height else width) - targetFormat.setInteger(KEY_HEIGHT, if (flip) width else height) + val flippedWidth = if (flip) height else width + val flippedHeight = if (flip) width else height + targetFormat.setInteger(KEY_WIDTH, flippedWidth) + targetFormat.setInteger(KEY_HEIGHT, flippedHeight) + log.i("encoded output format: $targetFormat") + log.i("output size=${flippedWidth}x${flippedHeight}, flipped=$flip") } // VideoTrackTranscoder.onConfigureDecoder override fun handleSourceFormat(sourceFormat: MediaFormat): Surface { - log.i("handleSourceFormat($sourceFormat)") + log.i("encoded input format: $sourceFormat") // Just a sanity check that the rotation coming from DataSource is not different from // the one found in the DataSource's MediaFormat for video. @@ -88,7 +89,9 @@ internal class VideoRenderer( return frameDrawer.surface } - override fun handleRawFormat(rawFormat: MediaFormat) = Unit + override fun handleRawFormat(rawFormat: MediaFormat) { + log.i("decoded input format: $rawFormat") + } override fun advance(state: State.Ok): State { return if (state is State.Eos) { @@ -101,7 +104,7 @@ internal class VideoRenderer( State.Ok(state.value.timeUs) } else { state.value.release(false) - State.Wait(false) + State.Consume() } } }