Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enh/rtmp proxy #1387

Merged
merged 6 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions api/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"github.com/TUM-Dev/gocast/dao"
"github.com/TUM-Dev/gocast/model"
"github.com/TUM-Dev/gocast/tools"
"github.com/TUM-Dev/gocast/tools/tum"
"github.com/gin-gonic/gin"
uuid "github.com/satori/go.uuid"
)

func configTokenRouter(r *gin.Engine, daoWrapper dao.DaoWrapper) {
routes := tokenRoutes{daoWrapper}
g := r.Group("/api/token")
g.Use(tools.Admin)
g.POST("/proxy/:token", routes.fetchStreamKey)
g.Use(tools.AtLeastLecturer)
g.POST("/create", routes.createToken)
g.DELETE("/:id", routes.deleteToken)
}
Expand All @@ -26,7 +28,34 @@ type tokenRoutes struct {

func (r tokenRoutes) deleteToken(c *gin.Context) {
id := c.Param("id")
err := r.TokenDao.DeleteToken(id)

foundContext, exists := c.Get("TUMLiveContext")
if !exists {
return
}
tumLiveContext := foundContext.(tools.TUMLiveContext)

token, err := r.TokenDao.GetTokenByID(id)
if err != nil {
logger.Error("can not get token", "err", err)
_ = c.Error(tools.RequestError{
Status: http.StatusInternalServerError,
CustomMessage: "can not get token",
Err: err,
})
return
}

// only the user who created the token or an admin can delete it
if token.UserID != tumLiveContext.User.ID && tumLiveContext.User.Role != model.AdminType {
_ = c.Error(tools.RequestError{
Status: http.StatusForbidden,
CustomMessage: "not allowed to delete token",
})
return
}

err = r.TokenDao.DeleteToken(id)
if err != nil {
logger.Error("can not delete token", "err", err)
_ = c.Error(tools.RequestError{
Expand Down Expand Up @@ -58,13 +87,22 @@ func (r tokenRoutes) createToken(c *gin.Context) {
})
return
}
if req.Scope != model.TokenScopeAdmin {
if req.Scope == model.TokenScopeAdmin && tumLiveContext.User.Role != model.AdminType {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "not an admin",
})
return
}

if req.Scope != model.TokenScopeAdmin && req.Scope != model.TokenScopeLecturer {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "invalid scope",
})
return
}

tokenStr := uuid.NewV4().String()
expires := sql.NullTime{Valid: req.Expires != nil}
if req.Expires != nil {
Expand All @@ -90,3 +128,67 @@ func (r tokenRoutes) createToken(c *gin.Context) {
"token": tokenStr,
})
}

// This is used by the proxy to get the stream key of the next stream of the lecturer given a lecturer token
//
// Proxy receives: rtmp://proxy.example.com/<lecturer-token>
// or: rtmp://proxy.example.com/<lecturer-token>?slug=ABC-123 <-- optional slug parameter in case the lecturer is streaming multiple courses simultaneously
//
// Proxy returns: rtmp://ingest.example.com/ABC-123?secret=610f609e4a2c43ac8a6d648177472b17
func (r *tokenRoutes) fetchStreamKey(c *gin.Context) {
// Optional slug parameter to get the stream key of a specific course (in case the lecturer is streaming multiple courses simultaneously)
slug := c.Query("slug")
t := c.Param("token")

// Get user from token
token, err := r.TokenDao.GetToken(t)
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "invalid token",
})
return
}

// Only tokens of type lecturer are allowed to start streaming
if token.Scope != model.TokenScopeLecturer {
_ = c.Error(tools.RequestError{
Status: http.StatusUnauthorized,
CustomMessage: "invalid scope",
})
return
}

// Get user and check if he has the right to start a stream
user, err := r.UsersDao.GetUserByID(c, token.UserID)
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusInternalServerError,
CustomMessage: "could not get user",
Err: err,
})
return

}
if user.Role != model.LecturerType && user.Role != model.AdminType {
_ = c.Error(tools.RequestError{
Status: http.StatusUnauthorized,
CustomMessage: "user is not a lecturer or admin",
})
return
}

// Find current/next stream and course of which the user is a lecturer
year, term := tum.GetCurrentSemester()
streamKey, courseSlug, err := r.StreamsDao.GetSoonStartingStreamInfo(&user, slug, year, term)
if err != nil || streamKey == "" || courseSlug == "" {
_ = c.Error(tools.RequestError{
Status: http.StatusNotFound,
CustomMessage: "no stream found",
Err: err,
})
return
}

c.JSON(http.StatusOK, gin.H{"url": "" + tools.Cfg.IngestBase + "/" + courseSlug + "?secret=" + streamKey + "/" + courseSlug})
}
2 changes: 2 additions & 0 deletions api/token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestToken(t *testing.T) {
wrapper := dao.DaoWrapper{
TokenDao: func() dao.TokenDao {
tokenMock := mock_dao.NewMockTokenDao(gomock.NewController(t))
tokenMock.EXPECT().GetTokenByID("1").Return(model.Token{}, nil).AnyTimes()
tokenMock.EXPECT().DeleteToken("1").Return(errors.New("")).AnyTimes()
return tokenMock
}(),
Expand All @@ -102,6 +103,7 @@ func TestToken(t *testing.T) {
wrapper := dao.DaoWrapper{
TokenDao: func() dao.TokenDao {
tokenMock := mock_dao.NewMockTokenDao(gomock.NewController(t))
tokenMock.EXPECT().GetTokenByID("1").Return(model.Token{}, nil).AnyTimes()
tokenMock.EXPECT().DeleteToken("1").Return(nil).AnyTimes()
return tokenMock
}(),
Expand Down
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ meili:
apiKey: MASTER_KEY
vodURLTemplate: https://stream.lrz.de/vod/_definst_/mp4:tum/RBG/%s.mp4/playlist.m3u8
canonicalURL: https://tum.live
rtmpProxyURL: https://proxy.example.com
2 changes: 1 addition & 1 deletion dao/courses.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (d coursesDao) CreateCourse(ctx context.Context, course *model.Course, keep

func (d coursesDao) AddAdminToCourse(userID uint, courseID uint) error {
defer Cache.Clear()
return DB.Exec("insert into course_admins (user_id, course_id) values (?, ?)", userID, courseID).Error
return DB.Exec("insert into course_admins (user_id, course_id) values (?, ?) on duplicate key update user_id = user_id", userID, courseID).Error
}

// GetCurrentOrNextLectureForCourse Gets the next lecture for a course or the lecture that is currently live. Error otherwise.
Expand Down
105 changes: 105 additions & 0 deletions dao/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package dao
import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

"gorm.io/gorm/clause"

"github.com/TUM-Dev/gocast/model"
uuid "github.com/satori/go.uuid"
"gorm.io/gorm"
)

Expand All @@ -32,6 +35,7 @@ type StreamsDao interface {
GetCurrentLiveNonHidden(ctx context.Context) (currentLive []model.Stream, err error)
GetLiveStreamsInLectureHall(lectureHallId uint) ([]model.Stream, error)
GetStreamsWithWatchState(courseID uint, userID uint) (streams []model.Stream, err error)
GetSoonStartingStreamInfo(user *model.User, slug string, year int, term string) (string, string, error)

SetLectureHall(streamIDs []uint, lectureHallID uint) error
UnsetLectureHall(streamIDs []uint) error
Expand Down Expand Up @@ -305,6 +309,107 @@ func (d streamsDao) GetStreamsWithWatchState(courseID uint, userID uint) (stream
return
}

// GetSoonStartingStreamInfo returns the stream key, course slug and course name of an upcoming stream.
func (d streamsDao) GetSoonStartingStreamInfo(user *model.User, slug string, year int, term string) (string, string, error) {
var result struct {
CourseID uint
StreamKey string
ID string
Slug string
}
now := time.Now()
query := DB.Table("streams").
Select("streams.course_id, streams.stream_key, streams.id, courses.slug").
Joins("JOIN course_admins ON course_admins.course_id = streams.course_id").
Joins("JOIN courses ON courses.id = course_admins.course_id").
Where("courses.slug != 'TESTCOURSE' AND streams.deleted_at IS NULL AND courses.deleted_at IS NULL AND course_admins.user_id = ? AND (streams.start <= ? AND streams.end >= ?)", user.ID, now.Add(15*time.Minute), now). // Streams starting in the next 15 minutes or currently running
Or("courses.slug != 'TESTCOURSE' AND streams.deleted_at IS NULL AND courses.deleted_at IS NULL AND course_admins.user_id = ? AND (streams.end >= ? AND streams.end <= ?)", user.ID, now.Add(-15*time.Minute), now). // Streams that just finished in the last 15 minutes
Order("streams.start ASC")

if slug != "" {
query = query.Where("courses.slug = ?", slug)
}
if year != 0 {
query = query.Where("courses.year = ?", year)
}
if term != "" {
query = query.Where("courses.teaching_term = ?", term)
}

err := query.Limit(1).Scan(&result).Error
if err == gorm.ErrRecordNotFound || result.StreamKey == "" || result.ID == "" || result.Slug == "" {
stream, course, err := d.CreateOrGetTestStreamAndCourse(user)
if err != nil {
return "", "", err
}
return stream.StreamKey, fmt.Sprintf("%s-%d", course.Slug, stream.ID), nil
}
if err != nil {
logger.Error("Error getting soon starting stream: %v", slog.String("err", err.Error()))
return "", "", err
}

return result.StreamKey, fmt.Sprintf("%s-%s", result.Slug, result.ID), nil
}

// Helper method to fetch test stream and course for current user.
func (d streamsDao) CreateOrGetTestStreamAndCourse(user *model.User) (model.Stream, model.Course, error) {
course, err := d.CreateOrGetTestCourse(user)
if err != nil {
return model.Stream{}, model.Course{}, err
}

var stream model.Stream
err = DB.FirstOrCreate(&stream, model.Stream{
CourseID: course.ID,
Name: "Test Stream",
Description: "This is a test stream",
LectureHallID: 0,
}).Error
if err != nil {
return model.Stream{}, model.Course{}, err
}

stream.Start = time.Now().Add(5 * time.Minute)
stream.End = time.Now().Add(1 * time.Hour)
stream.LiveNow = true
stream.Recording = true
stream.LiveNowTimestamp = time.Now().Add(5 * time.Minute)
stream.Private = true
streamKey := uuid.NewV4().String()
stream.StreamKey = strings.ReplaceAll(streamKey, "-", "")
stream.LectureHallID = 1
err = DB.Save(&stream).Error
if err != nil {
return model.Stream{}, model.Course{}, err
}

return stream, course, err
}

// Helper method to fetch test course for current user.
func (d streamsDao) CreateOrGetTestCourse(user *model.User) (model.Course, error) {
var course model.Course
err := DB.FirstOrCreate(&course, model.Course{
Name: "(" + strconv.Itoa(int(user.ID)) + ") " + user.Name + "'s Test Course",
TeachingTerm: "Test",
Slug: "TESTCOURSE",
Year: 1234,
Visibility: "hidden",
VODEnabled: false, // TODO: Change to VODEnabled: true for default testcourse if necessary
}).Error
if err != nil {
return model.Course{}, err
}

err = CoursesDao.AddAdminToCourse(NewDaoWrapper().CoursesDao, user.ID, course.ID)
if err != nil {
return model.Course{}, err
}

return course, nil
}

// SetLectureHall set lecture-halls of streamIds to lectureHallID
func (d streamsDao) SetLectureHall(streamIDs []uint, lectureHallID uint) error {
return DB.Model(&model.Stream{}).Where("id IN ?", streamIDs).Update("lecture_hall_id", lectureHallID).Error
Expand Down
29 changes: 25 additions & 4 deletions dao/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type TokenDao interface {
AddToken(token model.Token) error

GetToken(token string) (model.Token, error)
GetAllTokens() ([]AllTokensDto, error)
GetTokenByID(id string) (model.Token, error)
GetAllTokens(user *model.User) ([]AllTokensDto, error)

TokenUsed(token model.Token) error

Expand All @@ -40,11 +41,31 @@ func (d tokenDao) GetToken(token string) (model.Token, error) {
return t, err
}

func (d tokenDao) GetTokenByID(id string) (model.Token, error) {
var t model.Token
err := DB.Model(&t).Where("id = ?", id).First(&t).Error
return t, err
}

// GetAllTokens returns all tokens and the corresponding users name, email and lrz id
func (d tokenDao) GetAllTokens() ([]AllTokensDto, error) {
func (d tokenDao) GetAllTokens(user *model.User) ([]AllTokensDto, error) {
var tokens []AllTokensDto
err := DB.Raw("SELECT tokens.*, u.name as user_name, u.email as user_email, u.lrz_id as user_lrz_id FROM tokens JOIN users u ON u.id = tokens.user_id WHERE tokens.deleted_at IS null").Scan(&tokens).Error
return tokens, err

query := DB.Table("tokens").
Select("tokens.*, u.name as user_name, u.email as user_email, u.lrz_id as user_lrz_id").
Joins("JOIN users u ON u.id = tokens.user_id").
Where("tokens.deleted_at IS NULL")

if user.Role != model.AdminType {
query = query.Where("tokens.user_id = ?", user.ID)
}

err := query.Scan(&tokens).Error
if err != nil {
return nil, err
}

return tokens, nil
}

// TokenUsed is called when a token is used. It sets the last_used field to the current time.
Expand Down
Loading
Loading