Skip to content

Commit

Permalink
feat(srt): 开启字幕异步处理, 增加消息回显;
Browse files Browse the repository at this point in the history
  • Loading branch information
speauty committed Apr 2, 2023
1 parent 0a6bfab commit 5e543f8
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 59 deletions.
1 change: 1 addition & 0 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func main() {
//var ctx = context.Background()

if err := cfg.GetInstance().Load(""); err != nil {
panic(err)
Expand Down
158 changes: 158 additions & 0 deletions cron/detector/srt_detector.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,160 @@
// Package detector 检测器
package detector

import (
"context"
"fmt"
"github.com/golang-module/carbon"
"io/fs"
"path/filepath"
"runtime"
"sync"
"translator/cron/reader"
"translator/cron/translator"
"translator/tst/tt_log"
"translator/tst/tt_translator"
_type "translator/type"
"translator/util"
)

var (
apiSrtDetector *SrtDetector
onceSrtDetector sync.Once
)

func GetInstance() *SrtDetector {
onceSrtDetector.Do(func() {
apiSrtDetector = new(SrtDetector)
apiSrtDetector.init()
})
return apiSrtDetector
}

type StrDetectorData struct {
Translator tt_translator.ITranslator
FromLang string
ToLang string
TranslateMode _type.TranslateMode
MainTrackReport _type.LangDirection
SrtFile string
SrtDir string
}

func (customSDD StrDetectorData) toReaderData(filePath string) *reader.SrtReaderData {
return &reader.SrtReaderData{
FilePath: filePath,
PtrTranslatorOpts: &translator.SrtTranslatorOpts{
Translator: customSDD.Translator,
FromLang: customSDD.FromLang,
ToLang: customSDD.ToLang,
TranslateMode: customSDD.TranslateMode,
MainTrackReport: customSDD.MainTrackReport,
},
}
}

type SrtDetector struct {
ctx context.Context
chanDetector chan *StrDetectorData
chanMsgDetector chan string
chanMsgRedirect chan string
maxChanDetector int
}

func (customSD *SrtDetector) SetMsgRedirect(chanMsg chan string) {
customSD.chanMsgRedirect = chanMsg
}

func (customSD *SrtDetector) Push(data *StrDetectorData) {
customSD.chanDetector <- data
}

func (customSD *SrtDetector) Run(ctx context.Context) {
customSD.ctx = ctx
customSD.jobDetector()
customSD.jobMsg()

}

func (customSD *SrtDetector) jobDetector() {
if customSD.maxChanDetector <= 0 {
customSD.log().Warn(fmt.Sprintf("%s-%s通道的最大数量(%d)无效, 重置为5", customSD.getName(), "chanDetector", customSD.maxChanDetector))
customSD.maxChanDetector = 5
}

for idx := 0; idx < customSD.maxChanDetector; idx++ {
go func(ctx context.Context, chanDetector chan *StrDetectorData, chanMsg chan string, idx int) {
coroutineName := fmt.Sprintf("检测协程(%d)", idx)
chanName := "chanDetector"
for true {
select {
case <-ctx.Done():
customSD.log().Info(fmt.Sprintf("%s关闭(ctx.done), %s被迫退出", customSD.getName(), coroutineName))
runtime.Goexit()
case currentData, isOpen := <-chanDetector:
timeStart := carbon.Now()
if isOpen == false && currentData == nil {
customSD.log().Info(fmt.Sprintf("%s-%s通道关闭, %s被迫退出", customSD.getName(), chanName, coroutineName))
runtime.Goexit()
}
if currentData.SrtFile != "" {
if len(currentData.SrtFile) > 4 && currentData.SrtFile[len(currentData.SrtFile)-4:] == ".srt" {
reader.GetInstance().Push(currentData.toReaderData(currentData.SrtFile))
chanMsg <- fmt.Sprintf("检测到文件: %s, 耗时: %d", currentData.SrtFile, carbon.Now().DiffAbsInSeconds(timeStart))
}
}
if currentData.SrtDir != "" {
_ = filepath.Walk(currentData.SrtDir, func(path string, info fs.FileInfo, err error) error {
if info.IsDir() || !util.IsSrtFile(path) || info.Size() == 0 { // 过掉目录 或非srt文件
return nil
}
if len(path) > 4 && path[len(path)-4:] == ".srt" {
reader.GetInstance().Push(currentData.toReaderData(path))
chanMsg <- fmt.Sprintf("检测到文件: %s, 耗时: %d", path, carbon.Now().DiffAbsInSeconds(timeStart))
}
return nil
})
}
}
}
}(customSD.ctx, customSD.chanDetector, customSD.chanMsgDetector, idx)
}
}

func (customSD *SrtDetector) jobMsg() {
go func(ctx context.Context, chanMsgDetector, chanMsgRedirect chan string) {
coroutineName := "消息协程"
chanName := "chanMsgDetector"

for true {
select {
case <-ctx.Done():
customSD.log().Info(fmt.Sprintf("%s关闭(ctx.done), %s被迫退出", customSD.getName(), coroutineName))
runtime.Goexit()
case currentMsg, isOpen := <-chanMsgDetector:
if isOpen == false && currentMsg == "" {
customSD.log().Info(fmt.Sprintf("%s-%s通道关闭, %s被迫退出", customSD.getName(), chanName, coroutineName))
runtime.Goexit()
}
if chanMsgRedirect != nil {
chanMsgRedirect <- fmt.Sprintf("当前时间: %s, 来源: %s, 信息: [%s]", carbon.Now().Layout(carbon.ShortDateTimeLayout), customSD.getName(), currentMsg)
}
customSD.log().Info(fmt.Sprintf("来源: %s, 信息: %s", customSD.getName(), currentMsg))
}
}
}(customSD.ctx, customSD.chanMsgDetector, customSD.chanMsgRedirect)
}

func (customSD *SrtDetector) getName() string {
return "SRT检测程序"
}

func (customSD *SrtDetector) init() {
customSD.chanDetector = make(chan *StrDetectorData, 10)
customSD.chanMsgDetector = make(chan string, 20)
customSD.maxChanDetector = 10
}

func (customSD *SrtDetector) log() *tt_log.TTLog {
return tt_log.GetInstance()
}
34 changes: 20 additions & 14 deletions cron/reader/srt_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"github.com/golang-module/carbon"
"go.uber.org/zap"
"io"
"os"
"runtime"
Expand Down Expand Up @@ -42,14 +41,21 @@ func (customSRD *SrtReaderData) toTranslatorData() *translator.SrtTranslatorData
}

type SrtReader struct {
ctx context.Context
chanReader chan *SrtReaderData
chanMsgReader chan string
maxChanReader int
ctx context.Context
chanReader chan *SrtReaderData
chanMsgReader chan string
chanMsgRedirect chan string
maxChanReader int
}

func (customSR *SrtReader) SetMsgRedirect(chanMsg chan string) {
customSR.chanMsgRedirect = chanMsg
}

func (customSR *SrtReader) Run(ctx context.Context) {
customSR.ctx = ctx
customSR.jobReader()
customSR.jobMsg()
}

func (customSR *SrtReader) Push(data *SrtReaderData) {
Expand Down Expand Up @@ -107,7 +113,7 @@ func (customSR *SrtReader) jobReader() {
chanMsg <- fmt.Sprintf("解析文件(%s)异常, 错误: %s, 即将丢弃", currentData.FilePath, err)
continue
}

currentData.PrtSrt.CntBlock = len(currentData.PrtSrt.Blocks)
translator.GetInstance().Push(currentData.toTranslatorData())
chanMsg <- fmt.Sprintf(
"读取文件(%s)成功, 文件名: %s, 字幕块: %d, 文件大小: %d, 耗时: %d",
Expand All @@ -120,9 +126,9 @@ func (customSR *SrtReader) jobReader() {
}
}

func (customSR *SrtReader) RedirectMsgTo(targetChan chan string) {
go func(ctx context.Context, chanMsgReader, targetChan chan string) {
coroutineName := "消息定向协程"
func (customSR *SrtReader) jobMsg() {
go func(ctx context.Context, chanMsgReader, chanMsgRedirect chan string) {
coroutineName := "消息协程"
chanName := "chanMsgReader"

for true {
Expand All @@ -135,17 +141,17 @@ func (customSR *SrtReader) RedirectMsgTo(targetChan chan string) {
customSR.log().Info(fmt.Sprintf("%s-%s通道关闭, %s被迫退出", customSR.getName(), chanName, coroutineName))
runtime.Goexit()
}
if targetChan == nil {
customSR.log().Info(fmt.Sprintf("%s未设置通道(%s)接管, 定向输出到日志", customSR.getName(), chanName), zap.String("msg", currentMsg))
if chanMsgRedirect != nil {
chanMsgRedirect <- fmt.Sprintf("当前时间: %s, 来源: %s, 信息: [%s]", carbon.Now().Layout(carbon.ShortDateTimeLayout), customSR.getName(), currentMsg)
}
targetChan <- fmt.Sprintf("当前时间: %s, 来源: %s, 信息: [%s]", carbon.Now().Layout(carbon.ShortDateTimeLayout), customSR.getName(), currentMsg)
customSR.log().Info(fmt.Sprintf("来源: %s, 信息: %s", customSR.getName(), currentMsg))
}
}
}(customSR.ctx, customSR.chanMsgReader, targetChan)
}(customSR.ctx, customSR.chanMsgReader, customSR.chanMsgRedirect)
}

func (customSR *SrtReader) getName() string {
return "SRT写入程序"
return "SRT读取程序"
}

func (customSR *SrtReader) init() {
Expand Down
27 changes: 17 additions & 10 deletions cron/translator/srt_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/golang-module/carbon"
"go.uber.org/zap"
"runtime"
"sync"
"translator/cron/writer"
Expand Down Expand Up @@ -61,11 +60,18 @@ type SrtTranslator struct {
ctx context.Context
chanTranslator chan *SrtTranslatorData
chanMsgTranslator chan string
chanMsgRedirect chan string
maxChanTranslator int
}

func (customST *SrtTranslator) SetMsgRedirect(chanMsg chan string) {
customST.chanMsgRedirect = chanMsg
}

func (customST *SrtTranslator) Run(ctx context.Context) {
customST.ctx = ctx
customST.jobTranslator()
customST.jobMsg()
}

func (customST *SrtTranslator) Push(data *SrtTranslatorData) {
Expand Down Expand Up @@ -132,6 +138,7 @@ func (customST *SrtTranslator) jobTranslator() {
blockChunked = append(blockChunked, tmpBlockStr)
tmpBlockStr = ""
}

if len(blockChunked) == 0 {
chanMsg <- fmt.Sprintf("字幕文件(%s)未解析到需要翻译的字幕块, 疑似增量翻译模式", currentData.PrtSrt.FileName)
continue
Expand All @@ -158,7 +165,7 @@ func (customST *SrtTranslator) jobTranslator() {
}
}
}

currentData.PrtSrt.FlagTranslated = flagTranslated
if flagTranslated == false {
chanMsg <- fmt.Sprintf("字幕文件(%s)未进行翻译", currentData.PrtSrt.FileName)
continue
Expand All @@ -174,28 +181,28 @@ func (customST *SrtTranslator) jobTranslator() {
}
}

func (customST *SrtTranslator) RedirectMsgTo(targetChan chan string) {
go func(ctx context.Context, chanMsgWriter, targetChan chan string) {
coroutineName := "消息定向协程"
func (customST *SrtTranslator) jobMsg() {
go func(ctx context.Context, chanMsgTranslator, chanMsgRedirect chan string) {
coroutineName := "消息协程"
chanName := "chanMsgTranslator"

for true {
select {
case <-ctx.Done():
customST.log().Info(fmt.Sprintf("%s关闭(ctx.done), %s被迫退出", customST.getName(), coroutineName))
runtime.Goexit()
case currentMsg, isOpen := <-chanMsgWriter:
case currentMsg, isOpen := <-chanMsgTranslator:
if isOpen == false && currentMsg == "" {
customST.log().Info(fmt.Sprintf("%s-%s通道关闭, %s被迫退出", customST.getName(), chanName, coroutineName))
runtime.Goexit()
}
if targetChan == nil {
customST.log().Info(fmt.Sprintf("%s未设置通道(%s)接管, 定向输出到日志", customST.getName(), chanName), zap.String("msg", currentMsg))
if chanMsgRedirect != nil {
chanMsgRedirect <- fmt.Sprintf("当前时间: %s, 来源: %s, 信息: [%s]", carbon.Now().Layout(carbon.ShortDateTimeLayout), customST.getName(), currentMsg)
}
targetChan <- fmt.Sprintf("当前时间: %s, 来源: %s, 信息: [%s]", carbon.Now().Layout(carbon.ShortDateTimeLayout), customST.getName(), currentMsg)
customST.log().Info(fmt.Sprintf("来源: %s, 信息: %s", customST.getName(), currentMsg))
}
}
}(customST.ctx, customST.chanMsgTranslator, targetChan)
}(customST.ctx, customST.chanMsgTranslator, customST.chanMsgRedirect)
}

func (customST *SrtTranslator) getName() string {
Expand Down
32 changes: 19 additions & 13 deletions cron/writer/srt_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/golang-module/carbon"
"go.uber.org/zap"
"os"
"runtime"
"sync"
Expand Down Expand Up @@ -32,14 +31,21 @@ type SrtWriterData struct {
}

type SrtWriter struct {
ctx context.Context
chanWriter chan *SrtWriterData
chanMsgWriter chan string
maxChanWriter int
ctx context.Context
chanWriter chan *SrtWriterData
chanMsgWriter chan string
chanMsgRedirect chan string
maxChanWriter int
}

func (customSW *SrtWriter) SetMsgRedirect(chanMsg chan string) {
customSW.chanMsgRedirect = chanMsg
}

func (customSW *SrtWriter) Run(ctx context.Context) {
customSW.ctx = ctx
customSW.jobWriter()
customSW.jobMsg()
}

func (customSW *SrtWriter) Push(data *SrtWriterData) {
Expand Down Expand Up @@ -96,10 +102,10 @@ func (customSW *SrtWriter) jobWriter() {
}
}

func (customSW *SrtWriter) RedirectMsgTo(targetChan chan string) {
go func(ctx context.Context, chanMsgWriter, targetChan chan string) {
coroutineName := "消息定向协程"
chanName := "chanWriter"
func (customSW *SrtWriter) jobMsg() {
go func(ctx context.Context, chanMsgWriter, chanMsgRedirect chan string) {
coroutineName := "消息协程"
chanName := "chanMsgWriter"

for true {
select {
Expand All @@ -111,13 +117,13 @@ func (customSW *SrtWriter) RedirectMsgTo(targetChan chan string) {
customSW.log().Info(fmt.Sprintf("%s-%s通道关闭, %s被迫退出", customSW.getName(), chanName, coroutineName))
runtime.Goexit()
}
if targetChan == nil {
customSW.log().Info(fmt.Sprintf("%s未设置通道(%s)接管, 定向输出到日志", customSW.getName(), chanName), zap.String("msg", currentMsg))
if chanMsgRedirect != nil {
chanMsgRedirect <- fmt.Sprintf("当前时间: %s, 来源: %s, 信息: [%s]", carbon.Now().Layout(carbon.ShortDateTimeLayout), customSW.getName(), currentMsg)
}
targetChan <- fmt.Sprintf("当前时间: %s, 来源: %s, 信息: [%s]", carbon.Now().Layout(carbon.ShortDateTimeLayout), customSW.getName(), currentMsg)
customSW.log().Info(fmt.Sprintf("来源: %s, 信息: %s", customSW.getName(), currentMsg))
}
}
}(customSW.ctx, customSW.chanMsgWriter, targetChan)
}(customSW.ctx, customSW.chanMsgWriter, customSW.chanMsgRedirect)
}

func (customSW *SrtWriter) getName() string {
Expand Down
Loading

0 comments on commit 5e543f8

Please sign in to comment.