From 2ab7b106c493de44cb86a5471431bc2e996126e4 Mon Sep 17 00:00:00 2001 From: Gustav Grusell Date: Mon, 24 Jun 2024 22:15:33 +0200 Subject: [PATCH] feat: support s3 urls for input and output This commit add support for using s3 urls on the format s3:/// in both input and output. If ans s3 URL is used as input, a presigned URL is created and used as input to ffmpeg. The duration of the presigned URLs can be controlled with the 'remote-files.s3.presignDurationSeconds' config property. If an s3 URL is used for 'outputFolder', output will first be stored locally and then uploaded to s3 once transcoding is finished. Aws credentials are read with DefaultCredentialsProvider, meaning aws credentials can be provided in a number of ways, see https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html; Not that when using s3 urls for input, the presigned URLs will be shown in the logs. If this is not desirable, setting logging.config (or env variable LOGGING_CONFIG) to 'classpath:logback-json-mask-s3-presign.xml' will use a log config that masks the presign query parameters. Signed-off-by: Gustav Grusell --- checks.gradle | 2 + encore-common/build.gradle.kts | 4 + .../oss/encore/S3RemoteFilesConfiguration.kt | 59 ++++++++++ .../se/svt/oss/encore/model/input/Input.kt | 13 ++- .../svt/oss/encore/service/EncoreService.kt | 8 +- .../service/localencode/LocalEncodeService.kt | 30 ++++- .../mediaanalyzer/MediaAnalyzerService.kt | 31 ++++-- .../service/remotefiles/RemoteFileHandler.kt | 7 ++ .../service/remotefiles/RemoteFileService.kt | 55 ++++++++++ .../service/remotefiles/s3/S3Properties.kt | 12 ++ .../remotefiles/s3/S3RemoteFileHandler.kt | 54 +++++++++ .../oss/encore/EncoreIntegrationTestBase.kt | 29 +++-- .../svt/oss/encore/EncoreS3IntegrationTest.kt | 103 ++++++++++++++++++ .../test/resources/application-test-s3.yml | 3 + .../se/svt/oss/encore/RedisExtension.kt | 35 ++---- .../se/svt/oss/encore/S3StorageExtension.kt | 28 +++++ .../se/svt/oss/encore/TestFixtureUtils.kt | 18 +++ .../logback-json-mask-s3-presign.xml | 18 +++ .../logback-json-mask-s3-presign.xml | 18 +++ 19 files changed, 474 insertions(+), 53 deletions(-) create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt create mode 100644 encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt create mode 100644 encore-common/src/test/resources/application-test-s3.yml create mode 100644 encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt create mode 100644 encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt create mode 100644 encore-web/src/main/resources/logback-json-mask-s3-presign.xml create mode 100644 encore-worker/src/main/resources/logback-json-mask-s3-presign.xml diff --git a/checks.gradle b/checks.gradle index 1d1e2b1..60fcd68 100644 --- a/checks.gradle +++ b/checks.gradle @@ -11,6 +11,8 @@ jacocoTestCoverageVerification { '*.static {...}', '*.model.*.get*', '*.service.localencode.LocalEncodeService.moveFile*', + '*.S3Properties*.get*()', + '*RemoteFileService.DefaultHandler.*', ] limit { counter = 'LINE' diff --git a/encore-common/build.gradle.kts b/encore-common/build.gradle.kts index 8c6f3ca..3ed61d5 100644 --- a/encore-common/build.gradle.kts +++ b/encore-common/build.gradle.kts @@ -15,6 +15,8 @@ dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j:1.7.3") + implementation(platform("software.amazon.awssdk:bom:2.29.2")) + implementation("software.amazon.awssdk:s3") testImplementation(project(":encore-web")) testImplementation("org.springframework.security:spring-security-test") @@ -22,9 +24,11 @@ dependencies { testImplementation("com.github.tomakehurst:wiremock-jre8-standalone:2.35.0") testImplementation("org.springframework.boot:spring-boot-starter-webflux") testImplementation("org.springframework.boot:spring-boot-starter-data-rest") + testFixturesImplementation(platform("org.springframework.boot:spring-boot-dependencies:3.1.3")) testFixturesImplementation("com.redis:testcontainers-redis:2.2.0") testFixturesImplementation("io.github.microutils:kotlin-logging:3.0.5") testFixturesImplementation("org.junit.jupiter:junit-jupiter-api") + testFixturesImplementation("org.testcontainers:localstack:1.20.3") } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt new file mode 100644 index 0000000..e961a2a --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt @@ -0,0 +1,59 @@ +package se.svt.oss.encore + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import se.svt.oss.encore.service.remotefiles.s3.S3Properties +import se.svt.oss.encore.service.remotefiles.s3.S3RemoteFileHandler +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.S3Configuration +import software.amazon.awssdk.services.s3.presigner.S3Presigner +import java.net.URI + +@ConditionalOnProperty("remote-files.s3.enabled", havingValue = "true") +@EnableConfigurationProperties(S3Properties::class) +@Configuration +class S3RemoteFilesConfiguration { + + @Bean + fun s3Region() = + Region.of(System.getProperty("aws.region") ?: System.getenv("AWS_REGION") ?: "us-east-1") + + @Bean + fun s3Client(s3Region: Region, s3Properties: S3Properties) = S3AsyncClient.builder() + .region(s3Region) + .crossRegionAccessEnabled(true) + .multipartEnabled(true) + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build() + ) + .apply { + if (!s3Properties.endpoint.isNullOrBlank()) { + endpointOverride(URI.create(s3Properties.endpoint)) + } + } + .build() + + @Bean + fun s3Presigner(s3Region: Region, s3Properties: S3Properties) = S3Presigner.builder() + .region(s3Region) + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build() + ) + .apply { + if (!s3Properties.endpoint.isNullOrBlank()) { + endpointOverride(URI.create(s3Properties.endpoint)) + } + } + .build() + + @Bean + fun s3RemoteFileHandler(s3Client: S3AsyncClient, s3Presigner: S3Presigner, s3Properties: S3Properties) = + S3RemoteFileHandler(s3Client, s3Presigner, s3Properties) +} diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt index 721a616..06c2d5b 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt @@ -35,6 +35,8 @@ sealed interface Input { @get:Schema(description = "URI of input file", required = true, example = "/path/to/file.mp4") val uri: String + var accessUri: String + @get:Schema(description = "Input params required to properly decode input", example = """{ "ac": "2" }""") val params: LinkedHashMap @@ -167,6 +169,9 @@ data class AudioInput( override val type: String get() = TYPE_AUDIO + @JsonIgnore + override var accessUri: String = uri + override fun withSeekTo(seekTo: Double) = copy(seekTo = seekTo) val duration: Double @@ -188,6 +193,9 @@ data class VideoInput( override val seekTo: Double? = null, override val copyTs: Boolean = false ) : VideoIn { + @JsonIgnore + override var accessUri: String = uri + override val analyzedVideo: VideoFile @JsonIgnore get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed video for $uri is ${analyzed?.type}") @@ -220,6 +228,9 @@ data class AudioVideoInput( override val seekTo: Double? = null, override val copyTs: Boolean = false ) : VideoIn, AudioIn { + @JsonIgnore + override var accessUri: String = uri + override val analyzedVideo: VideoFile @JsonIgnore get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed audio/video for $uri is ${analyzed?.type}") @@ -244,7 +255,7 @@ fun List.inputParams(readDuration: Double?): List = (readDuration?.let { listOf("-t", "$it") } ?: emptyList()) + (input.seekTo?.let { listOf("-ss", "$it") } ?: emptyList()) + (if (input.copyTs) listOf("-copyts") else emptyList()) + - listOf("-i", input.uri) + listOf("-i", input.accessUri ?: input.uri) } fun List.maxDuration(): Double? = maxOfOrNull { diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt index 00d4517..28b0443 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt @@ -42,6 +42,7 @@ import se.svt.oss.encore.service.callback.CallbackService import se.svt.oss.encore.service.localencode.LocalEncodeService import se.svt.oss.encore.service.mediaanalyzer.MediaAnalyzerService import se.svt.oss.encore.service.queue.QueueService +import se.svt.oss.encore.service.remotefiles.RemoteFileService import se.svt.oss.mediaanalyzer.file.MediaContainer import se.svt.oss.mediaanalyzer.file.MediaFile import java.io.File @@ -60,6 +61,7 @@ class EncoreService( private val localEncodeService: LocalEncodeService, private val encoreProperties: EncoreProperties, private val queueService: QueueService, + private val remoteFileService: RemoteFileService ) { private val log = KotlinLogging.logger {} @@ -225,7 +227,7 @@ class EncoreService( repository.save(encoreJob) cancelTopic?.removeListener(cancelListener) callbackService.sendProgressCallback(encoreJob) - localEncodeService.cleanup(outputFolder) + localEncodeService.cleanup(outputFolder, encoreJob) } } @@ -268,6 +270,10 @@ class EncoreService( } private fun initJob(encoreJob: EncoreJob) { + encoreJob.inputs.forEach { input -> + input.accessUri = remoteFileService.getAccessUri(input.uri) + } + encoreJob.inputs.forEach { input -> mediaAnalyzerService.analyzeInput(input) } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt index b9c3ae9..fb2c2a0 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt @@ -9,18 +9,21 @@ import org.springframework.stereotype.Service import se.svt.oss.encore.config.EncoreProperties import se.svt.oss.encore.model.EncoreJob import se.svt.oss.encore.process.createTempDir +import se.svt.oss.encore.service.remotefiles.RemoteFileService import se.svt.oss.mediaanalyzer.file.AudioFile import se.svt.oss.mediaanalyzer.file.ImageFile import se.svt.oss.mediaanalyzer.file.MediaFile import se.svt.oss.mediaanalyzer.file.VideoFile import java.io.File +import java.net.URI import java.nio.file.Files import java.nio.file.Path import java.nio.file.StandardCopyOption @Service class LocalEncodeService( - private val encoreProperties: EncoreProperties + private val encoreProperties: EncoreProperties, + private val remoteFileService: RemoteFileService ) { private val log = KotlinLogging.logger {} @@ -28,7 +31,7 @@ class LocalEncodeService( fun outputFolder( encoreJob: EncoreJob ): String { - return if (encoreProperties.localTemporaryEncode) { + return if (encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder)) { createTempDir("job_${encoreJob.id}").toString() } else { encoreJob.outputFolder @@ -40,6 +43,23 @@ class LocalEncodeService( output: List, encoreJob: EncoreJob ): List { + if (remoteFileService.isRemoteFile(encoreJob.outputFolder)) { + log.debug { "Moving files to output destination ${encoreJob.outputFolder}, from local temp $outputFolder" } + File(outputFolder).listFiles()?.forEach { localFile -> + val remoteFile = URI.create(encoreJob.outputFolder).resolve(localFile.name).toString() + remoteFileService.upload(localFile.toString(), remoteFile) + } + val files = output.map { + val resolvedPath = URI.create(encoreJob.outputFolder).resolve(Path.of(it.file).fileName.toString()).toString() + when (it) { + is VideoFile -> it.copy(file = resolvedPath) + is AudioFile -> it.copy(file = resolvedPath) + is ImageFile -> it.copy(file = resolvedPath) + else -> throw Exception("Invalid conversion") + } + } + return files + } if (encoreProperties.localTemporaryEncode) { val destination = File(encoreJob.outputFolder) log.debug { "Moving files to correct outputFolder ${encoreJob.outputFolder}, from local temp $outputFolder" } @@ -52,8 +72,10 @@ class LocalEncodeService( return output } - fun cleanup(tempDirectory: String?) { - if (tempDirectory != null && encoreProperties.localTemporaryEncode) { + fun cleanup(tempDirectory: String?, encoreJob: EncoreJob) { + if (tempDirectory != null && + (encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder)) + ) { File(tempDirectory).deleteRecursively() } } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt index 74f60b3..61d78da 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt @@ -19,6 +19,8 @@ import se.svt.oss.mediaanalyzer.ffprobe.FfVideoStream import se.svt.oss.mediaanalyzer.ffprobe.ProbeResult import se.svt.oss.mediaanalyzer.ffprobe.UnknownStream import se.svt.oss.mediaanalyzer.file.AudioFile +import se.svt.oss.mediaanalyzer.file.ImageFile +import se.svt.oss.mediaanalyzer.file.SubtitleFile import se.svt.oss.mediaanalyzer.file.VideoFile import se.svt.oss.mediaanalyzer.mediainfo.AudioTrack import se.svt.oss.mediaanalyzer.mediainfo.GeneralTrack @@ -52,20 +54,25 @@ class MediaAnalyzerService(private val mediaAnalyzer: MediaAnalyzer) { val useFirstAudioStreams = (input as? AudioIn)?.channelLayout?.channels?.size input.analyzed = mediaAnalyzer.analyze( - file = input.uri, + file = input.accessUri, probeInterlaced = probeInterlaced, ffprobeInputParams = input.params - ).let { - val selectedVideoStream = (input as? VideoIn)?.videoStream - val selectedAudioStream = (input as? AudioIn)?.audioStream - when (it) { - is VideoFile -> it.selectVideoStream(selectedVideoStream) - .selectAudioStream(selectedAudioStream) - .trimAudio(useFirstAudioStreams) - is AudioFile -> it.selectAudioStream(selectedAudioStream) - .trimAudio(useFirstAudioStreams) - else -> it + ) + .let { + val selectedVideoStream = (input as? VideoIn)?.videoStream + val selectedAudioStream = (input as? AudioIn)?.audioStream + when (it) { + is VideoFile -> it.selectVideoStream(selectedVideoStream) + .selectAudioStream(selectedAudioStream) + .trimAudio(useFirstAudioStreams) + .copy(file = input.uri) + is AudioFile -> it.selectAudioStream(selectedAudioStream) + .trimAudio(useFirstAudioStreams) + .copy(file = input.uri) + is ImageFile -> it.copy(file = input.uri) + is SubtitleFile -> it.copy(file = input.uri) + else -> it + } } - } } } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt new file mode 100644 index 0000000..645f07c --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt @@ -0,0 +1,7 @@ +package se.svt.oss.encore.service.remotefiles + +interface RemoteFileHandler { + fun getAccessUri(uri: String): String + fun upload(localFile: String, remoteFile: String) + val protocols: List +} diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt new file mode 100644 index 0000000..45dbb68 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt @@ -0,0 +1,55 @@ +package se.svt.oss.encore.service.remotefiles + +import mu.KotlinLogging +import org.springframework.stereotype.Service +import java.net.URI + +@Service +class RemoteFileService(private val remoteFileHandlers: List) { + + private val log = KotlinLogging.logger {} + + private val defaultHandler = DefaultHandler() + + fun isRemoteFile(uriOrPath: String): Boolean { + val uri = URI.create(uriOrPath) + return !(uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file") + } + + fun getAccessUri(uriOrPath: String): String { + val uri = URI.create(uriOrPath) + return getHandler(uri).getAccessUri(uriOrPath) + } + + fun upload(localFile: String, remoteFile: String) { + val uri = URI.create(remoteFile) + getHandler(uri).upload(localFile, remoteFile) + } + + private fun getHandler(uri: URI): RemoteFileHandler { + log.info { "Getting handler for uri $uri. Available protocols: ${remoteFileHandlers.flatMap {it.protocols} }" } + if (uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file") { + return defaultHandler + } + val handler = remoteFileHandlers.firstOrNull { it.protocols.contains(uri.scheme) } + if (handler != null) { + return handler + } + log.info { "No remote file handler found for protocol ${uri.scheme}. Using default handler." } + return defaultHandler + } + + /** Handler user for protocols where no specific handler is defined. Works for local files and + * any protocols that ffmpeg supports natively */ + private class DefaultHandler : RemoteFileHandler { + override fun getAccessUri(uri: String): String { + return uri + } + + override fun upload(localFile: String, remoteFile: String) { + // Do nothing + } + + override val protocols: List = emptyList() + } +} diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt new file mode 100644 index 0000000..79c4a82 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt @@ -0,0 +1,12 @@ +package se.svt.oss.encore.service.remotefiles.s3 + +import org.springframework.boot.context.properties.ConfigurationProperties +import java.time.Duration + +@ConfigurationProperties("remote-files.s3") +data class S3Properties( + val enabled: Boolean = false, + val endpoint: String = "", + val presignDurationSeconds: Long = Duration.ofHours(12).seconds, + val uploadTimeoutSeconds: Long = Duration.ofHours(1).seconds +) diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt new file mode 100644 index 0000000..e8f6f29 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt @@ -0,0 +1,54 @@ +package se.svt.oss.encore.service.remotefiles.s3 + +import se.svt.oss.encore.service.remotefiles.RemoteFileHandler +import software.amazon.awssdk.services.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.model.GetObjectRequest +import software.amazon.awssdk.services.s3.model.PutObjectRequest +import software.amazon.awssdk.services.s3.presigner.S3Presigner +import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest +import java.net.URI +import java.nio.file.Paths +import java.util.concurrent.TimeUnit + +class S3RemoteFileHandler( + private val client: S3AsyncClient, + private val presigner: S3Presigner, + private val s3Properties: S3Properties +) : RemoteFileHandler { + + private val log = mu.KotlinLogging.logger {} + + override fun getAccessUri(uri: String): String { + val s3Uri = URI.create(uri) + + val objectRequest: GetObjectRequest = GetObjectRequest.builder() + .bucket(s3Uri.host) + .key(s3Uri.path.stripLeadingSlash()) + .build() + val presignRequest: GetObjectPresignRequest = GetObjectPresignRequest.builder() + .signatureDuration(java.time.Duration.ofSeconds(s3Properties.presignDurationSeconds)) + .getObjectRequest(objectRequest) + .build() + + val presignedRequest = presigner.presignGetObject(presignRequest) + val url = presignedRequest.url().toExternalForm() + return url + } + + override fun upload(localFile: String, remoteFile: String) { + log.info { "Uploading $localFile to $remoteFile" } + val s3Uri = URI.create(remoteFile) + val bucket = s3Uri.host + val objectName = s3Uri.path.stripLeadingSlash() + val putObjectRequest: PutObjectRequest = PutObjectRequest.builder() + .bucket(bucket) + .key(objectName) + .build() + val res = client.putObject(putObjectRequest, Paths.get(localFile)).get(s3Properties.presignDurationSeconds, TimeUnit.SECONDS) + log.info { "Upload result: $res" } + } + + private fun String.stripLeadingSlash() = if (startsWith("/")) substring(1) else this + + override val protocols = listOf("s3") +} diff --git a/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt index 15155a5..1e2ba68 100644 --- a/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt +++ b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt @@ -157,19 +157,24 @@ class EncoreIntegrationTestBase { logContext = mapOf("FlowId" to UUID.randomUUID().toString()) ) - fun defaultExpectedOutputFiles(outputDir: File, testFile: Resource): List { - return listOf( - expectedFile(outputDir, testFile, "x264_3100.mp4"), - expectedFile(outputDir, testFile, "x264_2069.mp4"), - expectedFile(outputDir, testFile, "x264_1312.mp4"), - expectedFile(outputDir, testFile, "x264_806.mp4"), - expectedFile(outputDir, testFile, "x264_324.mp4"), - expectedFile(outputDir, testFile, "STEREO.mp4"), - expectedFile(outputDir, testFile, "thumb01.jpg"), - expectedFile(outputDir, testFile, "thumb02.jpg"), - expectedFile(outputDir, testFile, "thumb03.jpg"), - expectedFile(outputDir, testFile, "12x20_160x90_thumbnail_map.jpg") + fun defaultExpectedOutputFileSuffixes() = + listOf( + "x264_3100.mp4", + "x264_2069.mp4", + "x264_1312.mp4", + "x264_806.mp4", + "x264_324.mp4", + "STEREO.mp4", + "thumb01.jpg", + "thumb02.jpg", + "thumb03.jpg", + "12x20_160x90_thumbnail_map.jpg" ) + + fun defaultExpectedOutputFiles(outputDir: File, testFile: Resource): List { + return defaultExpectedOutputFileSuffixes().map { + expectedFile(outputDir, testFile, it) + } } fun expectedFile(outputDir: File, baseName: String, suffix: String) = diff --git a/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt new file mode 100644 index 0000000..e2faab5 --- /dev/null +++ b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt @@ -0,0 +1,103 @@ +// SPDX-FileCopyrightText: 2020 Sveriges Television AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore + +import com.fasterxml.jackson.module.kotlin.readValue +import mu.KotlinLogging +import org.awaitility.Durations +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.io.TempDir +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.context.ActiveProfiles +import se.svt.oss.encore.Assertions.assertThat +import se.svt.oss.encore.model.Status +import se.svt.oss.encore.model.callback.JobProgress +import se.svt.oss.encore.model.input.AudioVideoInput +import software.amazon.awssdk.services.s3.S3AsyncClient +import java.io.File +import java.nio.file.Paths + +@ExtendWith(S3StorageExtension::class) +@ActiveProfiles(profiles = ["test-local", "test-s3"]) +class EncoreS3IntegrationTest : EncoreIntegrationTestBase() { + private val log = KotlinLogging.logger {} + + @Autowired + lateinit var s3Client: S3AsyncClient + + val inputBucket = "input-bucket" + val outputBucket = "output-bucket" + + @BeforeEach + override fun setUp() { + super.setUp() + + listOf(inputBucket, outputBucket).forEach { bucket -> + s3Client.createBucket { it.bucket(bucket) } + .get() + } + } + + @AfterEach + override fun tearDown() { + listOf(inputBucket, outputBucket).forEach { bucket -> + s3Client.listObjects { it.bucket(bucket) } + .get() + .contents() + .forEach { obj -> + s3Client.deleteObject { it.bucket(bucket).key(obj.key()) } + .get() + } + s3Client.deleteBucket { it.bucket(bucket) } + .get() + } + super.tearDown() + } + + @Test + fun jobWiths3InputAndOutputIsSuccessful(@TempDir outputDir: File) { + val filename = "test.mp4" + val remoteInput = uploadInputfile(testFileSurround.file.absolutePath, filename) + + val job = job(outputDir = outputDir, file = testFileSurround) + .copy( + outputFolder = "s3://$outputBucket/output/", + inputs = listOf(AudioVideoInput(uri = remoteInput)) + ) + + val createdJob = createAndAwaitJob( + job = job, + timeout = Durations.FIVE_MINUTES + ) { it.status.isCompleted } + + assertThat(createdJob).hasStatus(Status.SUCCESSFUL) + + val progressCalls = wireMockServer.allServeEvents.map { objectMapper.readValue(it.request.bodyAsString) } + assertThat(progressCalls.first()) + .hasStatus(Status.SUCCESSFUL) + + val expectedFiles = (defaultExpectedOutputFileSuffixes() + listOf("SURROUND.mp4")) + .map { "output/${createdJob.baseName}_$it" } + + val actualFiles = s3Client.listObjectsV2 { + it.bucket(outputBucket) + .prefix("output/") + } + .get() + .contents() + .map { it.key() ?: "" } + assertThat(actualFiles).containsExactlyInAnyOrder(*expectedFiles.toTypedArray()) + // expectedFiles.forEach { minioClient.statObject(StatObjectArgs.builder().bucket(outputBucket).`object`(it).build()) } + } + + private fun uploadInputfile(localPath: String, key: String): String { + s3Client.putObject({ it.bucket(inputBucket).key(key).build() }, Paths.get(localPath)) + + return "s3://$inputBucket/$key" + } +} diff --git a/encore-common/src/test/resources/application-test-s3.yml b/encore-common/src/test/resources/application-test-s3.yml new file mode 100644 index 0000000..983b8e7 --- /dev/null +++ b/encore-common/src/test/resources/application-test-s3.yml @@ -0,0 +1,3 @@ +remote-files: + s3: + enabled: true \ No newline at end of file diff --git a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt index d5b5cad..cc654e4 100644 --- a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt +++ b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt @@ -4,7 +4,6 @@ import com.redis.testcontainers.RedisContainer import mu.KotlinLogging import org.junit.jupiter.api.extension.BeforeAllCallback import org.junit.jupiter.api.extension.ExtensionContext -import org.testcontainers.DockerClientFactory import org.testcontainers.utility.DockerImageName private const val DEFAULT_REDIS_DOCKER_IMAGE = "redis:6.2.13" @@ -12,29 +11,19 @@ private const val DEFAULT_REDIS_DOCKER_IMAGE = "redis:6.2.13" class RedisExtension : BeforeAllCallback { private val log = KotlinLogging.logger { } override fun beforeAll(context: ExtensionContext?) { - if (isDockerAvailable()) { - val dockerImageName = System.getenv("ENCORE_REDIS_DOCKER_IMAGE") ?: DEFAULT_REDIS_DOCKER_IMAGE - val redisContainer = RedisContainer(DockerImageName.parse(dockerImageName)) - .withKeyspaceNotifications() - redisContainer.start() - val host = redisContainer.redisHost - val port = redisContainer.redisPort.toString() - log.info { "Setting spring.data.redis.host=$host" } - log.info { "Setting spring.data.redis.port=$port" } - System.setProperty("spring.data.redis.host", host) - System.setProperty("spring.data.redis.port", port) - } - } - - private fun isDockerAvailable(): Boolean { - return try { - log.info { "Checking for docker..." } - DockerClientFactory.instance().client() - log.info { "Docker is available" } - true - } catch (ex: Throwable) { + if (!isDockerAvailable()) { log.warn { "Docker is not available! Make sure redis is available as configured by spring.data.redis (default localhost:6379)" } - false + return } + val dockerImageName = System.getenv("ENCORE_REDIS_DOCKER_IMAGE") ?: DEFAULT_REDIS_DOCKER_IMAGE + val redisContainer = RedisContainer(DockerImageName.parse(dockerImageName)) + .withKeyspaceNotifications() + redisContainer.start() + val host = redisContainer.redisHost + val port = redisContainer.redisPort.toString() + log.info { "Setting spring.data.redis.host=$host" } + log.info { "Setting spring.data.redis.port=$port" } + System.setProperty("spring.data.redis.host", host) + System.setProperty("spring.data.redis.port", port) } } diff --git a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt new file mode 100644 index 0000000..2f5f63d --- /dev/null +++ b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt @@ -0,0 +1,28 @@ +package se.svt.oss.encore + +import mu.KotlinLogging +import org.junit.jupiter.api.extension.BeforeAllCallback +import org.junit.jupiter.api.extension.ExtensionContext +import org.testcontainers.containers.localstack.LocalStackContainer +import org.testcontainers.utility.DockerImageName + +class S3StorageExtension : BeforeAllCallback { + private val log = KotlinLogging.logger { } + override fun beforeAll(context: ExtensionContext?) { + if (!isDockerAvailable()) { + log.warn { "Docker is not available! Make sure minio is available as configured by remote-files.s3.*" } + return + } + val localstackImage = DockerImageName.parse("localstack/localstack:3.5.0") + + val localstack: LocalStackContainer = LocalStackContainer(localstackImage) + .withServices(LocalStackContainer.Service.S3) + localstack.start() + + log.info { "localstack endpoint: ${localstack.endpoint}" } + + System.setProperty("aws.accessKeyId", localstack.accessKey) + System.setProperty("aws.secretAccessKey", localstack.secretKey) + System.setProperty("remote-files.s3.endpoint", localstack.endpoint.toString()) + } +} diff --git a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt new file mode 100644 index 0000000..1ac0b2f --- /dev/null +++ b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt @@ -0,0 +1,18 @@ +package se.svt.oss.encore + +import mu.KotlinLogging +import org.testcontainers.DockerClientFactory + +private val log = KotlinLogging.logger { } + +fun isDockerAvailable(): Boolean { + return try { + log.info { "Checking for docker..." } + DockerClientFactory.instance().client() + log.info { "Docker is available" } + true + } catch (ex: Throwable) { + log.warn { "Docker is not available! Make sure redis is available as configured by spring.data.redis (default localhost:6379)" } + false + } +} diff --git a/encore-web/src/main/resources/logback-json-mask-s3-presign.xml b/encore-web/src/main/resources/logback-json-mask-s3-presign.xml new file mode 100644 index 0000000..6f1292c --- /dev/null +++ b/encore-web/src/main/resources/logback-json-mask-s3-presign.xml @@ -0,0 +1,18 @@ + + + + + + + (X-Amz-[^=]+)=[^&]* + $1=*** + + + + + + + + + + diff --git a/encore-worker/src/main/resources/logback-json-mask-s3-presign.xml b/encore-worker/src/main/resources/logback-json-mask-s3-presign.xml new file mode 100644 index 0000000..6f1292c --- /dev/null +++ b/encore-worker/src/main/resources/logback-json-mask-s3-presign.xml @@ -0,0 +1,18 @@ + + + + + + + (X-Amz-[^=]+)=[^&]* + $1=*** + + + + + + + + + +