Skip to content

Commit

Permalink
feat: support s3 urls for input and output
Browse files Browse the repository at this point in the history
This commit add support for using s3 urls on the format
s3://<BUCKET>/<KEY> in both input and output.

If ans s3 URL is used as input, a presigned URL is created
and used as input to ffmpeg. The duration of the presigned URLs can be
controlled with the 'remote-files.s3.presignDurationSeconds' config
property.

If an s3 URL is used for 'outputFolder', output will first be stored
locally and then uploaded to s3 once transcoding is finished.

Aws credentials are read with DefaultCredentialsProvider, meaning
aws credentials can be provided in a number of ways, see
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html;

Not that when using s3 urls for input, the presigned URLs will be
shown in the logs. If this is not desirable, setting
logging.config (or env variable LOGGING_CONFIG) to
'classpath:logback-json-mask-s3-presign.xml'
will use a log config that masks the presign query parameters.

Signed-off-by: Gustav Grusell <[email protected]>
  • Loading branch information
grusell committed Nov 9, 2024
1 parent 97e2899 commit 2ab7b10
Show file tree
Hide file tree
Showing 19 changed files with 474 additions and 53 deletions.
2 changes: 2 additions & 0 deletions checks.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jacocoTestCoverageVerification {
'*.static {...}',
'*.model.*.get*',
'*.service.localencode.LocalEncodeService.moveFile*',
'*.S3Properties*.get*()',
'*RemoteFileService.DefaultHandler.*',
]
limit {
counter = 'LINE'
Expand Down
4 changes: 4 additions & 0 deletions encore-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@ dependencies {

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j:1.7.3")
implementation(platform("software.amazon.awssdk:bom:2.29.2"))
implementation("software.amazon.awssdk:s3")

testImplementation(project(":encore-web"))
testImplementation("org.springframework.security:spring-security-test")
testImplementation("org.awaitility:awaitility")
testImplementation("com.github.tomakehurst:wiremock-jre8-standalone:2.35.0")
testImplementation("org.springframework.boot:spring-boot-starter-webflux")
testImplementation("org.springframework.boot:spring-boot-starter-data-rest")

testFixturesImplementation(platform("org.springframework.boot:spring-boot-dependencies:3.1.3"))
testFixturesImplementation("com.redis:testcontainers-redis:2.2.0")
testFixturesImplementation("io.github.microutils:kotlin-logging:3.0.5")
testFixturesImplementation("org.junit.jupiter:junit-jupiter-api")
testFixturesImplementation("org.testcontainers:localstack:1.20.3")
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package se.svt.oss.encore

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import se.svt.oss.encore.service.remotefiles.s3.S3Properties
import se.svt.oss.encore.service.remotefiles.s3.S3RemoteFileHandler
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.S3Configuration
import software.amazon.awssdk.services.s3.presigner.S3Presigner
import java.net.URI

@ConditionalOnProperty("remote-files.s3.enabled", havingValue = "true")
@EnableConfigurationProperties(S3Properties::class)
@Configuration
class S3RemoteFilesConfiguration {

@Bean
fun s3Region() =
Region.of(System.getProperty("aws.region") ?: System.getenv("AWS_REGION") ?: "us-east-1")

@Bean
fun s3Client(s3Region: Region, s3Properties: S3Properties) = S3AsyncClient.builder()
.region(s3Region)
.crossRegionAccessEnabled(true)
.multipartEnabled(true)
.serviceConfiguration(
S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build()
)
.apply {
if (!s3Properties.endpoint.isNullOrBlank()) {
endpointOverride(URI.create(s3Properties.endpoint))
}
}
.build()

@Bean
fun s3Presigner(s3Region: Region, s3Properties: S3Properties) = S3Presigner.builder()
.region(s3Region)
.serviceConfiguration(
S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build()
)
.apply {
if (!s3Properties.endpoint.isNullOrBlank()) {
endpointOverride(URI.create(s3Properties.endpoint))
}
}
.build()

@Bean
fun s3RemoteFileHandler(s3Client: S3AsyncClient, s3Presigner: S3Presigner, s3Properties: S3Properties) =
S3RemoteFileHandler(s3Client, s3Presigner, s3Properties)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ sealed interface Input {
@get:Schema(description = "URI of input file", required = true, example = "/path/to/file.mp4")
val uri: String

var accessUri: String

@get:Schema(description = "Input params required to properly decode input", example = """{ "ac": "2" }""")
val params: LinkedHashMap<String, String?>

Expand Down Expand Up @@ -167,6 +169,9 @@ data class AudioInput(
override val type: String
get() = TYPE_AUDIO

@JsonIgnore
override var accessUri: String = uri

override fun withSeekTo(seekTo: Double) = copy(seekTo = seekTo)

val duration: Double
Expand All @@ -188,6 +193,9 @@ data class VideoInput(
override val seekTo: Double? = null,
override val copyTs: Boolean = false
) : VideoIn {
@JsonIgnore
override var accessUri: String = uri

override val analyzedVideo: VideoFile
@JsonIgnore
get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed video for $uri is ${analyzed?.type}")
Expand Down Expand Up @@ -220,6 +228,9 @@ data class AudioVideoInput(
override val seekTo: Double? = null,
override val copyTs: Boolean = false
) : VideoIn, AudioIn {
@JsonIgnore
override var accessUri: String = uri

override val analyzedVideo: VideoFile
@JsonIgnore
get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed audio/video for $uri is ${analyzed?.type}")
Expand All @@ -244,7 +255,7 @@ fun List<Input>.inputParams(readDuration: Double?): List<String> =
(readDuration?.let { listOf("-t", "$it") } ?: emptyList()) +
(input.seekTo?.let { listOf("-ss", "$it") } ?: emptyList()) +
(if (input.copyTs) listOf("-copyts") else emptyList()) +
listOf("-i", input.uri)
listOf("-i", input.accessUri ?: input.uri)
}

fun List<Input>.maxDuration(): Double? = maxOfOrNull {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import se.svt.oss.encore.service.callback.CallbackService
import se.svt.oss.encore.service.localencode.LocalEncodeService
import se.svt.oss.encore.service.mediaanalyzer.MediaAnalyzerService
import se.svt.oss.encore.service.queue.QueueService
import se.svt.oss.encore.service.remotefiles.RemoteFileService
import se.svt.oss.mediaanalyzer.file.MediaContainer
import se.svt.oss.mediaanalyzer.file.MediaFile
import java.io.File
Expand All @@ -60,6 +61,7 @@ class EncoreService(
private val localEncodeService: LocalEncodeService,
private val encoreProperties: EncoreProperties,
private val queueService: QueueService,
private val remoteFileService: RemoteFileService
) {

private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -225,7 +227,7 @@ class EncoreService(
repository.save(encoreJob)
cancelTopic?.removeListener(cancelListener)
callbackService.sendProgressCallback(encoreJob)
localEncodeService.cleanup(outputFolder)
localEncodeService.cleanup(outputFolder, encoreJob)
}
}

Expand Down Expand Up @@ -268,6 +270,10 @@ class EncoreService(
}

private fun initJob(encoreJob: EncoreJob) {
encoreJob.inputs.forEach { input ->
input.accessUri = remoteFileService.getAccessUri(input.uri)
}

encoreJob.inputs.forEach { input ->
mediaAnalyzerService.analyzeInput(input)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,29 @@ import org.springframework.stereotype.Service
import se.svt.oss.encore.config.EncoreProperties
import se.svt.oss.encore.model.EncoreJob
import se.svt.oss.encore.process.createTempDir
import se.svt.oss.encore.service.remotefiles.RemoteFileService
import se.svt.oss.mediaanalyzer.file.AudioFile
import se.svt.oss.mediaanalyzer.file.ImageFile
import se.svt.oss.mediaanalyzer.file.MediaFile
import se.svt.oss.mediaanalyzer.file.VideoFile
import java.io.File
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption

@Service
class LocalEncodeService(
private val encoreProperties: EncoreProperties
private val encoreProperties: EncoreProperties,
private val remoteFileService: RemoteFileService
) {

private val log = KotlinLogging.logger {}

fun outputFolder(
encoreJob: EncoreJob
): String {
return if (encoreProperties.localTemporaryEncode) {
return if (encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder)) {
createTempDir("job_${encoreJob.id}").toString()
} else {
encoreJob.outputFolder
Expand All @@ -40,6 +43,23 @@ class LocalEncodeService(
output: List<MediaFile>,
encoreJob: EncoreJob
): List<MediaFile> {
if (remoteFileService.isRemoteFile(encoreJob.outputFolder)) {
log.debug { "Moving files to output destination ${encoreJob.outputFolder}, from local temp $outputFolder" }
File(outputFolder).listFiles()?.forEach { localFile ->
val remoteFile = URI.create(encoreJob.outputFolder).resolve(localFile.name).toString()
remoteFileService.upload(localFile.toString(), remoteFile)
}
val files = output.map {
val resolvedPath = URI.create(encoreJob.outputFolder).resolve(Path.of(it.file).fileName.toString()).toString()
when (it) {
is VideoFile -> it.copy(file = resolvedPath)
is AudioFile -> it.copy(file = resolvedPath)
is ImageFile -> it.copy(file = resolvedPath)
else -> throw Exception("Invalid conversion")
}
}
return files
}
if (encoreProperties.localTemporaryEncode) {
val destination = File(encoreJob.outputFolder)
log.debug { "Moving files to correct outputFolder ${encoreJob.outputFolder}, from local temp $outputFolder" }
Expand All @@ -52,8 +72,10 @@ class LocalEncodeService(
return output
}

fun cleanup(tempDirectory: String?) {
if (tempDirectory != null && encoreProperties.localTemporaryEncode) {
fun cleanup(tempDirectory: String?, encoreJob: EncoreJob) {
if (tempDirectory != null &&
(encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder))
) {
File(tempDirectory).deleteRecursively()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import se.svt.oss.mediaanalyzer.ffprobe.FfVideoStream
import se.svt.oss.mediaanalyzer.ffprobe.ProbeResult
import se.svt.oss.mediaanalyzer.ffprobe.UnknownStream
import se.svt.oss.mediaanalyzer.file.AudioFile
import se.svt.oss.mediaanalyzer.file.ImageFile
import se.svt.oss.mediaanalyzer.file.SubtitleFile
import se.svt.oss.mediaanalyzer.file.VideoFile
import se.svt.oss.mediaanalyzer.mediainfo.AudioTrack
import se.svt.oss.mediaanalyzer.mediainfo.GeneralTrack
Expand Down Expand Up @@ -52,20 +54,25 @@ class MediaAnalyzerService(private val mediaAnalyzer: MediaAnalyzer) {
val useFirstAudioStreams = (input as? AudioIn)?.channelLayout?.channels?.size

input.analyzed = mediaAnalyzer.analyze(
file = input.uri,
file = input.accessUri,
probeInterlaced = probeInterlaced,
ffprobeInputParams = input.params
).let {
val selectedVideoStream = (input as? VideoIn)?.videoStream
val selectedAudioStream = (input as? AudioIn)?.audioStream
when (it) {
is VideoFile -> it.selectVideoStream(selectedVideoStream)
.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
is AudioFile -> it.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
else -> it
)
.let {
val selectedVideoStream = (input as? VideoIn)?.videoStream
val selectedAudioStream = (input as? AudioIn)?.audioStream
when (it) {
is VideoFile -> it.selectVideoStream(selectedVideoStream)
.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
.copy(file = input.uri)
is AudioFile -> it.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
.copy(file = input.uri)
is ImageFile -> it.copy(file = input.uri)
is SubtitleFile -> it.copy(file = input.uri)
else -> it
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package se.svt.oss.encore.service.remotefiles

interface RemoteFileHandler {
fun getAccessUri(uri: String): String
fun upload(localFile: String, remoteFile: String)
val protocols: List<String>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package se.svt.oss.encore.service.remotefiles

import mu.KotlinLogging
import org.springframework.stereotype.Service
import java.net.URI

@Service
class RemoteFileService(private val remoteFileHandlers: List<RemoteFileHandler>) {

private val log = KotlinLogging.logger {}

private val defaultHandler = DefaultHandler()

fun isRemoteFile(uriOrPath: String): Boolean {
val uri = URI.create(uriOrPath)
return !(uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file")
}

fun getAccessUri(uriOrPath: String): String {
val uri = URI.create(uriOrPath)
return getHandler(uri).getAccessUri(uriOrPath)
}

fun upload(localFile: String, remoteFile: String) {
val uri = URI.create(remoteFile)
getHandler(uri).upload(localFile, remoteFile)
}

private fun getHandler(uri: URI): RemoteFileHandler {
log.info { "Getting handler for uri $uri. Available protocols: ${remoteFileHandlers.flatMap {it.protocols} }" }
if (uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file") {
return defaultHandler
}
val handler = remoteFileHandlers.firstOrNull { it.protocols.contains(uri.scheme) }
if (handler != null) {
return handler
}
log.info { "No remote file handler found for protocol ${uri.scheme}. Using default handler." }
return defaultHandler
}

/** Handler user for protocols where no specific handler is defined. Works for local files and
* any protocols that ffmpeg supports natively */
private class DefaultHandler : RemoteFileHandler {
override fun getAccessUri(uri: String): String {
return uri
}

override fun upload(localFile: String, remoteFile: String) {
// Do nothing
}

override val protocols: List<String> = emptyList()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package se.svt.oss.encore.service.remotefiles.s3

import org.springframework.boot.context.properties.ConfigurationProperties
import java.time.Duration

@ConfigurationProperties("remote-files.s3")
data class S3Properties(
val enabled: Boolean = false,
val endpoint: String = "",
val presignDurationSeconds: Long = Duration.ofHours(12).seconds,
val uploadTimeoutSeconds: Long = Duration.ofHours(1).seconds
)
Loading

0 comments on commit 2ab7b10

Please sign in to comment.