Skip to content

Commit

Permalink
Revisit Step interface
Browse files Browse the repository at this point in the history
  • Loading branch information
natario1 committed Aug 13, 2024
1 parent a2b8da1 commit f100111
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@ import kotlin.math.floor
* remixing, stretching. TODO: With some extra work this could be split in different steps.
*/
internal class AudioEngine(
private val stretcher: AudioStretcher,
private val resampler: AudioResampler,
private val targetFormat: MediaFormat
private val stretcher: AudioStretcher,
private val resampler: AudioResampler,
private val targetFormat: MediaFormat
): QueuedStep<DecoderData, DecoderChannel, EncoderData, EncoderChannel>("AudioEngine"), DecoderChannel {

companion object {
private val ID = AtomicInteger(0)
}
private val log = Logger("AudioEngine(${ID.getAndIncrement()})")

override val channel = this
private val buffers = ShortBuffers()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.otaliastudios.transcoder.internal.codec
import android.media.MediaCodec.*
import android.media.MediaFormat
import android.view.Surface
import com.otaliastudios.transcoder.common.TrackType
import com.otaliastudios.transcoder.common.trackType
import com.otaliastudios.transcoder.internal.data.ReaderChannel
import com.otaliastudios.transcoder.internal.data.ReaderData
Expand All @@ -29,15 +30,15 @@ internal interface DecoderChannel : Channel {
}

internal class Decoder(
private val format: MediaFormat, // source.getTrackFormat(track)
continuous: Boolean, // relevant if the source sends no-render chunks. should we compensate or not?
) : QueuedStep<ReaderData, ReaderChannel, DecoderData, DecoderChannel>("Decoder"), ReaderChannel {

companion object {
private val ID = trackMapOf(AtomicInteger(0), AtomicInteger(0))
private val format: MediaFormat, // source.getTrackFormat(track)
continuous: Boolean, // relevant if the source sends no-render chunks. should we compensate or not?
) : QueuedStep<ReaderData, ReaderChannel, DecoderData, DecoderChannel>(
when (format.trackType) {
TrackType.VIDEO -> "VideoDecoder"
TrackType.AUDIO -> "AudioDecoder"
}
), ReaderChannel {

private val log = Logger("Decoder(${format.trackType},${ID[format.trackType].getAndIncrement()})")
override val channel = this

private val codec = createDecoderByType(format.getString(MediaFormat.KEY_MIME)!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ internal class DecoderTimerData(
) : DecoderData(buffer, timeUs, release)

internal class DecoderTimer(
private val track: TrackType,
private val interpolator: TimeInterpolator,
private val track: TrackType,
private val interpolator: TimeInterpolator,
) : TransformStep<DecoderData, DecoderChannel>("DecoderTimer") {

private var lastTimeUs: Long? = null
private var lastRawTimeUs: Long? = null

override fun step(state: State.Ok<DecoderData>, fresh: Boolean): State<DecoderData> {
override fun advance(state: State.Ok<DecoderData>): State<DecoderData> {
if (state is State.Eos) return state
require(state.value !is DecoderTimerData) {
"Can't apply DecoderTimer twice."
Expand All @@ -42,11 +42,11 @@ internal class DecoderTimer(
lastRawTimeUs = rawTimeUs

return State.Ok(DecoderTimerData(
buffer = state.value.buffer,
rawTimeUs = rawTimeUs,
timeUs = timeUs,
timeStretch = timeStretch,
release = state.value.release
buffer = state.value.buffer,
rawTimeUs = rawTimeUs,
timeUs = timeUs,
timeStretch = timeStretch,
release = state.value.release
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ internal class Encoder(
override val surface: Surface?,
ownsCodecStart: Boolean,
private val ownsCodecStop: Boolean,
) : QueuedStep<EncoderData, EncoderChannel, WriterData, WriterChannel>("Encoder"), EncoderChannel {
) : QueuedStep<EncoderData, EncoderChannel, WriterData, WriterChannel>(
when (surface) {
null -> "AudioEncoder"
else -> "VideoEncoder"
}
), EncoderChannel {

constructor(codecs: Codecs, type: TrackType) : this(
codecs.encoders[type].first,
Expand All @@ -45,12 +50,7 @@ internal class Encoder(
codecs.ownsEncoderStop[type]
)

companion object {
private val ID = trackMapOf(AtomicInteger(0), AtomicInteger(0))
}

private val type = if (surface != null) TrackType.VIDEO else TrackType.AUDIO
private val log = Logger("Encoder(${type},${ID[type].getAndIncrement()})")
private var dequeuedInputs by observable(0) { _, _, _ -> printDequeued() }
private var dequeuedOutputs by observable(0) { _, _, _ -> printDequeued() }
private fun printDequeued() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package com.otaliastudios.transcoder.internal.data

import android.media.MediaCodec
import android.media.MediaFormat
import com.otaliastudios.transcoder.internal.pipeline.BaseStep
import com.otaliastudios.transcoder.internal.pipeline.State
import com.otaliastudios.transcoder.internal.pipeline.Step
import com.otaliastudios.transcoder.internal.utils.Logger
import java.nio.ByteBuffer
import java.nio.ByteOrder

internal class Bridge(private val format: MediaFormat)
: Step<ReaderData, ReaderChannel, WriterData, WriterChannel>, ReaderChannel {
: BaseStep<ReaderData, ReaderChannel, WriterData, WriterChannel>("Bridge"), ReaderChannel {

override val name: String = "Bridge"
private val log = Logger("Bridge")
private val bufferSize = format.getInteger(MediaFormat.KEY_MAX_INPUT_SIZE)
private val buffer = ByteBuffer.allocateDirect(bufferSize).order(ByteOrder.nativeOrder())
override val channel = this
Expand All @@ -28,7 +27,7 @@ internal class Bridge(private val format: MediaFormat)
}

// Can't do much about chunk.render, since we don't even decode.
override fun step(state: State.Ok<ReaderData>, fresh: Boolean): State<WriterData> {
override fun advance(state: State.Ok<ReaderData>): State<WriterData> {
val (chunk, _) = state.value
val flags = if (chunk.keyframe) MediaCodec.BUFFER_FLAG_SYNC_FRAME else 0
val result = WriterData(chunk.buffer, chunk.timeUs, flags) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ internal interface ReaderChannel : Channel {
}

internal class Reader(
private val source: DataSource,
private val track: TrackType
private val source: DataSource,
private val track: TrackType
) : BaseStep<Unit, Channel, ReaderData, ReaderChannel>("Reader") {

private val log = Logger("Reader")
override val channel = Channel
private val chunk = DataSource.Chunk()

Expand All @@ -35,7 +34,7 @@ internal class Reader(
}
}

override fun step(state: State.Ok<Unit>, fresh: Boolean): State<ReaderData> {
override fun advance(state: State.Ok<Unit>): State<ReaderData> {
return if (source.isDrained) {
log.i("Source is drained! Returning Eos as soon as possible.")
nextBufferOrWait { byteBuffer, id ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class ReaderTimer(
private val track: TrackType,
private val interpolator: TimeInterpolator
) : TransformStep<ReaderData, ReaderChannel>("ReaderTimer") {
override fun step(state: State.Ok<ReaderData>, fresh: Boolean): State<ReaderData> {
override fun advance(state: State.Ok<ReaderData>): State<ReaderData> {
if (state is State.Eos) return state
state.value.chunk.timeUs = interpolator.interpolate(track, state.value.chunk.timeUs)
return state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ import com.otaliastudios.transcoder.source.DataSource
import java.nio.ByteBuffer

internal class Seeker(
private val source: DataSource,
positions: List<Long>,
private val seek: (Long) -> Boolean
private val source: DataSource,
positions: List<Long>,
private val seek: (Long) -> Boolean
) : BaseStep<Unit, Channel, Unit, Channel>("Seeker") {

private val log = Logger("Seeker")
override val channel = Channel
private val positions = positions.toMutableList()

override fun step(state: State.Ok<Unit>, fresh: Boolean): State<Unit> {
if (fresh && positions.isNotEmpty()) {
override fun advance(state: State.Ok<Unit>): State<Unit> {
if (positions.isNotEmpty()) {
if (seek(positions.first())) {
log.i("Seeking to next position ${positions.first()}")
val next = positions.removeFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.otaliastudios.transcoder.internal.data
import android.media.MediaCodec
import android.media.MediaFormat
import com.otaliastudios.transcoder.common.TrackType
import com.otaliastudios.transcoder.internal.pipeline.BaseStep
import com.otaliastudios.transcoder.internal.pipeline.Channel
import com.otaliastudios.transcoder.internal.pipeline.State
import com.otaliastudios.transcoder.internal.pipeline.Step
Expand All @@ -22,22 +23,20 @@ internal interface WriterChannel : Channel {
}

internal class Writer(
private val sink: DataSink,
private val track: TrackType
) : Step<WriterData, WriterChannel, Unit, Channel>, WriterChannel {
private val sink: DataSink,
private val track: TrackType
) : BaseStep<WriterData, WriterChannel, Unit, Channel>("Writer"), WriterChannel {

override val channel = this
override val name: String = "Writer"

private val log = Logger("Writer")
private val info = MediaCodec.BufferInfo()

override fun handleFormat(format: MediaFormat) {
log.i("handleFormat($format)")
sink.setTrackFormat(track, format)
}

override fun step(state: State.Ok<WriterData>, fresh: Boolean): State<Unit> {
override fun advance(state: State.Ok<WriterData>): State<Unit> {
val (buffer, timestamp, flags) = state.value
// Note: flags does NOT include BUFFER_FLAG_END_OF_STREAM. That's passed via State.Eos.
val eos = state is State.Eos
Expand All @@ -49,10 +48,10 @@ internal class Writer(
info.set(0, 0, 0, flags or MediaCodec.BUFFER_FLAG_END_OF_STREAM)
} else {
info.set(
buffer.position(),
buffer.remaining(),
timestamp,
flags
buffer.position(),
buffer.remaining(),
timestamp,
flags
)
}
sink.writeTrack(track, buffer, info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal class Pipeline private constructor(name: String, private val items: Lis
// - caused failure in the previous run (i == previouslyFailedIndex, failure != null)
// - run (with failure or success) then one of the items following it failed (i < previouslyFailedIndex, cached != null)
log.v("${i+1}/${items.size} '${item.step.name}' START (${if (headFresh) "fresh" else "stale"})")
item.set(item.step.step(headState, headFresh))
// item.set(item.step.step(headState, headFresh))
if (item.success != null) {
log.v("${i+1}/${items.size} '${item.step.name}' SUCCESS ${if (item.success is State.Eos) "(eos)" else ""}")
headState = item.success!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ internal interface Channel {
}

internal interface Step<
Input: Any,
InputChannel: Channel,
Output: Any,
OutputChannel: Channel
Input: Any,
InputChannel: Channel,
Output: Any,
OutputChannel: Channel
> {
val name: String
val channel: InputChannel

fun initialize(next: OutputChannel) = Unit

fun step(state: State.Ok<Input>, fresh: Boolean): State<Output>
fun advance(state: State.Ok<Input>): State<Output>

fun release() = Unit
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package com.otaliastudios.transcoder.internal.pipeline

import com.otaliastudios.transcoder.internal.utils.Logger

internal abstract class BaseStep<
Input: Any,
InputChannel: Channel,
Output: Any,
OutputChannel: Channel
>(override val name: String) : Step<Input, InputChannel, Output, OutputChannel> {
Input: Any,
InputChannel: Channel,
Output: Any,
OutputChannel: Channel
>(final override val name: String) : Step<Input, InputChannel, Output, OutputChannel> {

protected val log = Logger(name)

protected lateinit var next: OutputChannel
private set

Expand All @@ -23,10 +28,10 @@ internal abstract class TransformStep<D: Any, C: Channel>(name: String) : BaseSt
}

internal abstract class QueuedStep<
Input: Any,
InputChannel: Channel,
Output: Any,
OutputChannel: Channel
Input: Any,
InputChannel: Channel,
Output: Any,
OutputChannel: Channel
>(name: String) : BaseStep<Input, InputChannel, Output, OutputChannel>(name) {

protected abstract fun enqueue(data: Input)
Expand All @@ -35,11 +40,13 @@ internal abstract class QueuedStep<

protected abstract fun drain(): State<Output>

final override fun step(state: State.Ok<Input>, fresh: Boolean): State<Output> {
if (fresh) {
if (state is State.Eos) enqueueEos(state.value)
else enqueue(state.value)
}
final override fun advance(state: State.Ok<Input>): State<Output> {
if (state is State.Eos) enqueueEos(state.value)
else enqueue(state.value)
return drain()
}

fun retry(): State<Output> {
return drain()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import com.otaliastudios.opengl.core.EglCore
import com.otaliastudios.opengl.surface.EglWindowSurface
import com.otaliastudios.transcoder.internal.codec.EncoderChannel
import com.otaliastudios.transcoder.internal.codec.EncoderData
import com.otaliastudios.transcoder.internal.pipeline.BaseStep
import com.otaliastudios.transcoder.internal.pipeline.Channel
import com.otaliastudios.transcoder.internal.pipeline.State
import com.otaliastudios.transcoder.internal.pipeline.Step


internal class VideoPublisher: Step<Long, Channel, EncoderData, EncoderChannel> {
internal class VideoPublisher: BaseStep<Long, Channel, EncoderData, EncoderChannel>("VideoPublisher") {

override val channel = Channel
override val name: String = "VideoPublisher"

private val core = EglCore(EGL14.EGL_NO_CONTEXT, EglCore.FLAG_RECORDABLE)
private lateinit var surface: EglWindowSurface
Expand All @@ -24,7 +24,7 @@ internal class VideoPublisher: Step<Long, Channel, EncoderData, EncoderChannel>
surface.makeCurrent()
}

override fun step(state: State.Ok<Long>, fresh: Boolean): State<EncoderData> {
override fun advance(state: State.Ok<Long>): State<EncoderData> {
if (state is State.Eos) {
return State.Eos(EncoderData.Empty)
} else {
Expand Down
Loading

0 comments on commit f100111

Please sign in to comment.