diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsConfiguration.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsConfiguration.kt index aaecb0676c..564b02972c 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsConfiguration.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsConfiguration.kt @@ -71,8 +71,9 @@ class ArtifactMetricsConfiguration { @Bean fun artifactMetricsExporter( - customMetricsExporter: CustomMetricsExporter? = null + customMetricsExporter: CustomMetricsExporter? = null, + artifactMetricsProperties: ArtifactMetricsProperties, ): ArtifactMetricsExporter { - return ArtifactMetricsExporter(customMetricsExporter) + return ArtifactMetricsExporter(customMetricsExporter, artifactMetricsProperties.allowUnknownProjectExport) } } diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsProperties.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsProperties.kt index bde39d9d15..40fa5cafff 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsProperties.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/ArtifactMetricsProperties.kt @@ -76,5 +76,9 @@ data class ArtifactMetricsProperties( /** * 超过该大小的文件未命中缓存时将计入大文件缓存未命中监控数据 */ - var largeFileThreshold: DataSize = DataSize.ofGigabytes(3L) + var largeFileThreshold: DataSize = DataSize.ofGigabytes(3L), + /** + * 允许上报未知项目信息 + * */ + var allowUnknownProjectExport: Boolean = false, ) diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/export/ArtifactMetricsExporter.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/export/ArtifactMetricsExporter.kt index fc2d6a08ea..dc601acdfb 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/export/ArtifactMetricsExporter.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/metrics/export/ArtifactMetricsExporter.kt @@ -41,7 +41,8 @@ import org.slf4j.LoggerFactory import java.util.Queue class ArtifactMetricsExporter( - private val customMetricsExporter: CustomMetricsExporter? = null + private val customMetricsExporter: CustomMetricsExporter? = null, + private val allowUnknownProjectExport: Boolean, ) { fun export(queue: Queue) { @@ -51,7 +52,11 @@ class ArtifactMetricsExporter( val count: Int = queue.size for (i in 0 until count) { val item = queue.poll() - if (item.project == StringPool.UNKNOWN || item.fullPath == StringPool.UNKNOWN) continue + if ((item.project == StringPool.UNKNOWN || item.fullPath == StringPool.UNKNOWN) && + !allowUnknownProjectExport + ) { + continue + } val labels = convertRecordToMap(item) val metrics = TypeOfMetricsItem.ARTIFACT_TRANSFER_RATE val metricItem = MetricsItem( diff --git a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/service/StreamService.kt b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/service/StreamService.kt index 74425c05f7..8fd0a2a0c4 100644 --- a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/service/StreamService.kt +++ b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/service/StreamService.kt @@ -98,6 +98,7 @@ class StreamService( name: String, mode: StreamMode, userId: String, + author: String, remux: Boolean = false, saveType: MediaType = MediaType.RAW, transcodeExtraParams: String? = null, @@ -112,6 +113,7 @@ class StreamService( transcodeService, repo, userId, + author, STREAM_PATH, transcodeConfig, ) @@ -129,7 +131,7 @@ class StreamService( stream.saveAs() } stream.startPublish() - logger.info("User[$userId] publish stream $streamId") + logger.info("User[$author] publish stream $streamId") return stream } diff --git a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/MediaArtifactFileConsumer.kt b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/MediaArtifactFileConsumer.kt index b8b43b9505..e93904ebf7 100644 --- a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/MediaArtifactFileConsumer.kt +++ b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/MediaArtifactFileConsumer.kt @@ -18,6 +18,7 @@ class MediaArtifactFileConsumer( private val transcodeService: TranscodeService, private val repo: RepositoryDetail, private val userId: String, + private val author: String, private val path: String, private val transcodeConfig: TranscodeConfig? = null, ) : FileConsumer { @@ -34,7 +35,7 @@ class MediaArtifactFileConsumer( override fun accept(file: ArtifactFile, name: String) { val filePath = "$path/$name" val artifactInfo = ArtifactInfo(repo.projectId, repo.name, filePath) - val nodeCreateRequest = buildNodeCreateRequest(artifactInfo, file, userId) + val nodeCreateRequest = buildNodeCreateRequest(artifactInfo, file, userId, author) storageManager.storeArtifactFile(nodeCreateRequest, file, repo.storageCredentials) if (transcodeConfig != null) { transcodeService.transcode(artifactInfo, transcodeConfig, userId) @@ -45,6 +46,7 @@ class MediaArtifactFileConsumer( artifactInfo: ArtifactInfo, file: ArtifactFile, userId: String, + author: String, ): NodeCreateRequest { with(artifactInfo) { val endTime = System.currentTimeMillis() @@ -60,6 +62,7 @@ class MediaArtifactFileConsumer( nodeMetadata = listOf( MetadataModel(key = METADATA_KEY_MEDIA_START_TIME, value = startTime, system = true), MetadataModel(key = METADATA_KEY_MEDIA_STOP_TIME, value = endTime, system = true), + MetadataModel(key = METADATA_KEY_MEDIA_AUTHOR, value = author, system = true), ), ) } @@ -68,5 +71,6 @@ class MediaArtifactFileConsumer( companion object { private const val METADATA_KEY_MEDIA_START_TIME = "media.startTime" private const val METADATA_KEY_MEDIA_STOP_TIME = "media.stopTime" + private const val METADATA_KEY_MEDIA_AUTHOR = "media.author" } } diff --git a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/Mux.kt b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/Mux.kt index 5c0f36fdb3..61393d6fca 100644 --- a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/Mux.kt +++ b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/Mux.kt @@ -36,10 +36,10 @@ class Mux { * @param inputStream 视频输入流 * @param output 封装后的文件 * */ - constructor(inputStream: InputStream, output: File) : this() { + constructor(inputStream: InputStream, output: File, name: String) : this() { this.inputStream = inputStream this.outputFile = output - this.fileName = inputStream.toString() + this.fileName = name } constructor() @@ -53,6 +53,10 @@ class Mux { private var avio: AVIOContext? = null private var running: AtomicBoolean = AtomicBoolean(false) private var stopFlag: AtomicBoolean = AtomicBoolean(false) + private var writeHeader = false + + @Volatile + var packetCount = 0 fun start() { if (!running.compareAndSet(false, true)) { @@ -78,17 +82,16 @@ class Mux { ) ifmtCtx!!.pb(avio) } - check(avformat.avformat_open_input(ifmtCtx, fileName, null, null) >= 0) { "open failed" } - check(avformat.avformat_find_stream_info(ifmtCtx, null as? PointerPointer<*>) >= 0) { - "can't find stream info" - } + var ret = avformat.avformat_open_input(ifmtCtx, fileName, null, null) + check(ret >= 0) { "open failed [$ret]" } + ret = avformat.avformat_find_stream_info(ifmtCtx, null as? PointerPointer<*>) + check(ret >= 0) { "can't find stream info [$ret]" } if (logger.isDebugEnabled) { avformat.av_dump_format(ifmtCtx, 0, fileName, 0) } val outputFilePath = outputFile!!.absolutePath - check(avformat.avformat_alloc_output_context2(ofmtCtx, null, null, outputFilePath) >= 0) { - "create output ctx error" - } + ret = avformat.avformat_alloc_output_context2(ofmtCtx, null, null, outputFilePath) + check(ret >= 0) { "create output ctx error [$ret]" } val streamMapping = mutableMapOf() var streamIndex = 0 @@ -126,9 +129,8 @@ class Mux { check(avformat.avformat_write_header(ofmtCtx, null as? PointerPointer<*>) >= 0) { "Error occurred when opening output file" } - + writeHeader = true var dts = 0L - var count = 0L while (!stopFlag.get()) { if (avformat.av_read_frame(ifmtCtx, pkt) < 0) { break @@ -150,9 +152,9 @@ class Mux { pkt!!.pos(-1) logPacket(ofmtCtx, pkt!!, "out") check(avformat.av_interleaved_write_frame(ofmtCtx, pkt) >= 0) { "write frame error" } - count++ + packetCount++ } - logger.info("Complete remux $fileName,size ${outputFile!!.length()} B,$count packet.") + logger.info("Complete remux $fileName,size ${outputFile!!.length()} B,$packetCount packet.") } catch (e: Exception) { logger.error("Remux error:", e) throw e @@ -160,6 +162,7 @@ class Mux { release() scope.close() running.set(false) + logger.info("Finish remux $fileName to ${outputFile!!.absolutePath}") } } @@ -196,10 +199,14 @@ class Mux { } if (ofmtCtx != null) { - avformat.av_write_trailer(ofmtCtx) - logger.info("Finish remux $fileName to ${outputFile!!.absolutePath}") - if (ofmtCtx!!.oformat().flags() and avformat.AVFMT_NOFILE == 0) { - avformat.avio_closep(ofmtCtx!!.pb()) + if (writeHeader) { + avformat.av_write_trailer(ofmtCtx) + } + val oformat = ofmtCtx!!.oformat() + if (oformat != null) { + if (oformat.flags() and avformat.AVFMT_NOFILE == 0 && !ofmtCtx!!.pb().isNull) { + avformat.avio_closep(ofmtCtx!!.pb()) + } } avformat.avformat_free_context(ofmtCtx) ofmtCtx = null @@ -263,6 +270,7 @@ class Mux { val b = ByteArray(buf_size) val size = inputStream.read(b) return if (size < 0) { + logger.info("input end") avutil.AVERROR_EOF } else { buf.put(b, 0, size) diff --git a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/RemuxRecordingListener.kt b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/RemuxRecordingListener.kt index 21f435dcc3..e68e9b4a71 100644 --- a/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/RemuxRecordingListener.kt +++ b/src/backend/media/biz-media/src/main/kotlin/com/tencent/bkrepo/media/stream/RemuxRecordingListener.kt @@ -52,7 +52,12 @@ class RemuxRecordingListener( * */ private var fileName: String? = null + private var startFailed = AtomicBoolean(false) + override fun handler(packet: StreamPacket) { + if (startFailed.get()) { + return + } pipeOut.write(packet.getData()) } @@ -63,9 +68,14 @@ class RemuxRecordingListener( fileName = "$name.$fileType" val tempFileName = StringPool.randomStringByLongValue(REMUX_PREFIX, ".$fileType") tempFilePath = Paths.get(path, tempFileName) - mux = Mux(pipeIn, tempFilePath!!.toFile()) + mux = Mux(pipeIn, tempFilePath!!.toFile(), name) val remuxFuture = threadPool.submit { - mux!!.start() + try { + mux!!.start() + } catch (e: Exception) { + logger.error("Mux start failed", e) + startFailed.set(true) + } } if (remuxFuture.isDone) { throw IllegalStateException("Remux start error") @@ -80,7 +90,11 @@ class RemuxRecordingListener( pipeOut.close() mux!!.stop() pipeIn.close() - fileConsumer.accept(tempFilePath!!.toFile(), fileName!!) + if (mux!!.packetCount > 0) { + fileConsumer.accept(tempFilePath!!.toFile(), fileName!!) + } else { + logger.warn("empty stream $fileName") + } } } finally { Files.deleteIfExists(tempFilePath)