From 34e829811e366cd1c8f61c9ee7a01eb37c6c0d64 Mon Sep 17 00:00:00 2001 From: wrench Date: Mon, 27 Nov 2023 13:49:32 +0530 Subject: [PATCH] perf(streamer): improved telegram reader --- routes/stream.go | 15 ++++--- utils/linear_reader.go | 65 ++++++++++++++-------------- utils/stream_helpers.go | 93 ----------------------------------------- 3 files changed, 43 insertions(+), 130 deletions(-) delete mode 100644 utils/stream_helpers.go diff --git a/routes/stream.go b/routes/stream.go index c0a6762b..85e0a985 100644 --- a/routes/stream.go +++ b/routes/stream.go @@ -9,12 +9,16 @@ import ( "strconv" range_parser "github.com/quantumsheep/range-parser" + "go.uber.org/zap" "github.com/gin-gonic/gin" ) +var log *zap.Logger + func (e *allRoutes) LoadHome(r *Route) { - defer e.log.Info("Loaded stream route") + log = e.log.Named("Stream") + defer log.Info("Loaded stream route") r.Engine.GET("/stream/:messageID", getStreamRoute) } @@ -57,6 +61,7 @@ func getStreamRoute(ctx *gin.Context) { start = ranges[0].Start end = ranges[0].End ctx.Header("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, file.FileSize)) + log.Info("Content-Range", zap.Int64("start", start), zap.Int64("end", end), zap.Int64("fileSize", file.FileSize)) w.WriteHeader(http.StatusPartialContent) } @@ -79,13 +84,13 @@ func getStreamRoute(ctx *gin.Context) { ctx.Header("Content-Disposition", fmt.Sprintf("%s; filename=\"%s\"", disposition, file.FileName)) if r.Method != "HEAD" { - parts, err := utils.GetParts(ctx, bot.Bot.Client, file) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - parts = utils.RangedParts(parts, start, end) - lr, _ := utils.NewLinearReader(ctx, bot.Bot.Client, parts, contentLength) - io.CopyN(w, lr, contentLength) + lr, _ := utils.NewTelegramReader(ctx, bot.Bot.Client, file.Location, start, end, contentLength) + if _, err := io.CopyN(w, lr, contentLength); err != nil { + log.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error(err.Error()) + } } } diff --git a/utils/linear_reader.go b/utils/linear_reader.go index 3dd7ed35..9c722116 100644 --- a/utils/linear_reader.go +++ b/utils/linear_reader.go @@ -1,21 +1,22 @@ package utils import ( - "EverythingSuckz/fsb/types" "context" "fmt" "io" - "log" "github.com/gotd/td/telegram" "github.com/gotd/td/tg" + "go.uber.org/zap" ) -type linearReader struct { +type telegramReader struct { ctx context.Context - parts []types.Part - pos int + log *zap.Logger client *telegram.Client + location *tg.InputDocumentFileLocation + start int64 + end int64 next func() ([]byte, error) buffer []byte bytesread int64 @@ -24,69 +25,69 @@ type linearReader struct { contentLength int64 } -func (*linearReader) Close() error { +func (*telegramReader) Close() error { return nil } -func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types.Part, contentLength int64) (io.ReadCloser, error) { +func NewTelegramReader( + ctx context.Context, + client *telegram.Client, + location *tg.InputDocumentFileLocation, + start int64, + end int64, + contentLength int64, +) (io.ReadCloser, error) { - r := &linearReader{ + r := &telegramReader{ ctx: ctx, - parts: parts, + log: Logger.Named("telegramReader"), + location: location, client: client, + start: start, + end: end, chunkSize: int64(1024 * 1024), contentLength: contentLength, } - + r.log.Sugar().Info("Linear Reader: Start") r.next = r.partStream() - return r, nil } -func (r *linearReader) Read(p []byte) (n int, err error) { +func (r *telegramReader) Read(p []byte) (n int, err error) { if r.bytesread == r.contentLength { - log.Println("Linear Reader: EOF (bytesread == contentLength)") + r.log.Sugar().Info("Linear Reader: EOF (bytesread == contentLength)") return 0, io.EOF } if r.i >= int64(len(r.buffer)) { r.buffer, err = r.next() + r.log.Sugar().Infof("Next buffer: %d", len(r.buffer)) if err != nil { return 0, err } if len(r.buffer) == 0 { - r.pos++ - if r.pos == len(r.parts) { - log.Println("Linear Reader: EOF (pos==n(parts))") - return 0, io.EOF - } else { - r.next = r.partStream() - r.buffer, err = r.next() - if err != nil { - return 0, err - } + r.next = r.partStream() + r.buffer, err = r.next() + if err != nil { + return 0, err } } r.i = 0 } - n = copy(p, r.buffer[r.i:]) - r.i += int64(n) - r.bytesread += int64(n) - return n, nil } -func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) { +func (r *telegramReader) chunk(offset int64, limit int64) ([]byte, error) { req := &tg.UploadGetFileRequest{ Offset: offset, Limit: int(limit), - Location: r.parts[r.pos].Location, + Location: r.location, } res, err := r.client.API().UploadGetFile(r.ctx, req) @@ -103,10 +104,10 @@ func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) { } } -func (r *linearReader) partStream() func() ([]byte, error) { +func (r *telegramReader) partStream() func() ([]byte, error) { - start := r.parts[r.pos].Start - end := r.parts[r.pos].End + start := r.start + end := r.end offset := start - (start % r.chunkSize) firstPartCut := start - offset diff --git a/utils/stream_helpers.go b/utils/stream_helpers.go deleted file mode 100644 index d17ad39e..00000000 --- a/utils/stream_helpers.go +++ /dev/null @@ -1,93 +0,0 @@ -package utils - -import ( - "EverythingSuckz/fsb/types" - "bytes" - "context" - "fmt" - - "github.com/gotd/td/telegram" - "github.com/gotd/td/tg" -) - -type Parts []types.Part - -func getChunk(ctx context.Context, tgClient *telegram.Client, location tg.InputFileLocationClass, offset int64, limit int64) ([]byte, error) { - req := &tg.UploadGetFileRequest{ - Offset: offset, - Limit: int(limit), - Location: location, - } - r, err := tgClient.API().UploadGetFile(ctx, req) - if err != nil { - return nil, err - } - switch result := r.(type) { - case *tg.UploadFile: - return result.Bytes, nil - default: - return nil, fmt.Errorf("unexpected type %T", r) - } -} - -func IterContent(ctx context.Context, tgClient *telegram.Client, location tg.InputFileLocationClass) (*bytes.Buffer, error) { - offset := int64(0) - limit := int64(1024 * 1024) - buff := &bytes.Buffer{} - for { - r, err := getChunk(ctx, tgClient, location, offset, limit) - if err != nil { - return buff, err - } - if len(r) == 0 { - break - } - buff.Write(r) - offset += int64(limit) - } - return buff, nil -} - -func GetParts(ctx context.Context, client *telegram.Client, file *types.File) ([]types.Part, error) { - parts := []types.Part{} - parts = append(parts, types.Part{Location: file.Location, Start: 0, End: file.FileSize - 1}) - return parts, nil -} - -func RangedParts(parts []types.Part, startByte, endByte int64) []types.Part { - chunkSize := parts[0].End + 1 - numParts := int64(len(parts)) - validParts := []types.Part{} - firstChunk := max(startByte/chunkSize, 0) - lastChunk := min(endByte/chunkSize, numParts) - startInFirstChunk := startByte % chunkSize - endInLastChunk := endByte % chunkSize - if firstChunk == lastChunk { - validParts = append(validParts, types.Part{ - Location: parts[firstChunk].Location, - Start: startInFirstChunk, - End: endInLastChunk, - }) - } else { - validParts = append(validParts, types.Part{ - Location: parts[firstChunk].Location, - Start: startInFirstChunk, - End: parts[firstChunk].End, - }) - // Add valid parts from any chunks in between. - for i := firstChunk + 1; i < lastChunk; i++ { - validParts = append(validParts, types.Part{ - Location: parts[i].Location, - Start: 0, - End: parts[i].End, - }) - } - // Add valid parts from the last chunk. - validParts = append(validParts, types.Part{ - Location: parts[lastChunk].Location, - Start: 0, - End: endInLastChunk, - }) - } - return validParts -}