Skip to content

Commit

Permalink
stream running without IngestServer
Browse files Browse the repository at this point in the history
  • Loading branch information
DawinYurtseven committed Oct 28, 2024
1 parent 5770259 commit fb70635
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 22 deletions.
33 changes: 17 additions & 16 deletions api/runner_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func StreamRequest(ctx context.Context, dao dao.DaoWrapper, runner model.Runner)
version := fmt.Sprintf("%v", ctx.Value("version"))
actionID := fmt.Sprintf("%v", ctx.Value("actionID"))
stringEnd := fmt.Sprintf("%v", ctx.Value("end"))
end, err := time.Parse("2006-01-02T15:04:05+02:00", stringEnd)
end, err := time.Parse(time.RFC3339, stringEnd)
if err != nil {
logger.Error("Can't parse end", "err", err)
return
Expand All @@ -103,14 +103,14 @@ func StreamRequest(ctx context.Context, dao dao.DaoWrapper, runner model.Runner)
logger.Error("No source", "source", source)
return
}
server, err := dao.IngestServerDao.GetBestIngestServer()
/*server, err := dao.IngestServerDao.GetBestIngestServer()
if err != nil {
logger.Error("can't find ingest server", "err", err)
return
}
}*/

var slot model.StreamName
if version == "COMB" { //try to find a transcoding slot for comb view:
//var slot model.StreamName
/*if version == "COMB" { //try to find a transcoding slot for comb view:
slot, err = dao.IngestServerDao.GetTranscodedStreamSlot(server.ID)
}
if version != "COMB" || err != nil {
Expand All @@ -119,10 +119,10 @@ func StreamRequest(ctx context.Context, dao dao.DaoWrapper, runner model.Runner)
logger.Error("No free stream slot", "err", err)
return
}
}
}*/
src := "rtsp://" + source
slot.StreamID = stream.ID
dao.IngestServerDao.SaveSlot(slot)
//slot.StreamID = stream.ID
//dao.IngestServerDao.SaveSlot(slot)
req := protobuf.StreamRequest{
ActionID: actionID,
Stream: uint64(stream.ID),
Expand All @@ -131,11 +131,6 @@ func StreamRequest(ctx context.Context, dao dao.DaoWrapper, runner model.Runner)
End: timestamppb.New(end),
Source: src,
}
err = dao.StreamsDao.SetStreamRequested(stream)
if err != nil {
logger.Error("Can't set stream requested", "err", err)
return
}
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", runner.Hostname, runner.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Error("Can't dial runner", "err", err)
Expand All @@ -147,6 +142,11 @@ func StreamRequest(ctx context.Context, dao dao.DaoWrapper, runner model.Runner)
logger.Error("Can't request stream", "err", err)
return
}
err = dao.StreamsDao.SetStreamRequested(stream)
if err != nil {
logger.Error("Can't set stream requested", "err", err)
return
}
logger.Info("Stream requested", "ActionID", resp.ActionID)
if err = conn.Close(); err != nil {
logger.Error("Can't close connection", "err", err)
Expand Down Expand Up @@ -514,7 +514,7 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
}
for _, action := range activeAction {
runner := action.GetCurrentRunner()
if !runner.IsAlive() && action.IsCompleted() {
if !runner.IsAlive() && !action.IsCompleted() {
action.SetToFailed()
}
}
Expand Down Expand Up @@ -563,12 +563,13 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error {
switch action.Type {
case "stream":
StreamRequest(ctx, dao, runner)
action.SetToRunning()
break
case "transcoding":
TranscodingRequest(ctx, dao, runner)
//TranscodingRequest(ctx, dao, runner)
break
}
action.SetToRunning()

return nil
}

Expand Down
8 changes: 7 additions & 1 deletion dao/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ func (d runnerDao) GetAll(c context.Context) ([]model.Runner, error) {
log.Error("no runners found")
return nil, err
}
return runners, err
var aliveRunner []model.Runner
for _, runner := range runners {
if runner.IsAlive() {
aliveRunner = append(aliveRunner, runner)
}
}
return aliveRunner, err
}

// Create a Runner.
Expand Down
4 changes: 3 additions & 1 deletion model/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Runner struct {
Hostname string `gorm:"UniqueKey;type:varchar(80)"`
Port int
LastSeen time.Time
Alive bool

Status string
Workload uint
Expand Down Expand Up @@ -46,5 +47,6 @@ func (r *Runner) UpdateStats(tx *gorm.DB, ctx context.Context) (bool, error) {
}

func (r *Runner) IsAlive() bool {
return r.LastSeen.After(time.Now().Add(time.Minute * -1))
r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -1))
return r.Alive
}
4 changes: 2 additions & 2 deletions runner/actions/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (a *ActionProvider) StreamAction() *Action {
if !ok {
return ctx, fmt.Errorf("%w: context doesn't contain end", ErrRequiredContextValNotFound)
}
log.Info("streaming", "source", source, "end", time.Now().Second()+end.Second())
log.Info("streaming", "source", source, "now", time.Now(), "end", end, "before", time.Now().Before(end))

//endingTime := time.Now().Add(time.Second * time.Duration(end.Second()))
log.Info("streaming until", "end", end)
Expand Down Expand Up @@ -86,7 +86,7 @@ func (a *ActionProvider) StreamAction() *Action {
src += "-re" // read input at native framerate, e.g. when streaming a file in realtime
}

log.Info("streaming", "source", source, "end", time.Now().Second()+end.Second())
log.Info("streaming", "source", source, "end", time.Until(end).Seconds())

//changing the end variable from a date to a duration and adding the duration to the current time
cmd := fmt.Sprintf(a.Cmd.Stream, src, time.Until(end).Seconds(), source, filename, filepath.Join(a.GetLiveDir(courseID, streamID, version), end.Format("15-04-05")), livePlaylist)
Expand Down
3 changes: 2 additions & 1 deletion runner/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func (r *Runner) RequestStream(ctx context.Context, req *protobuf.StreamRequest)
ctx = contextFromStreamReq(req, ctx)
ctx = context.WithValue(ctx, "URL", "")
ctx = context.WithValue(ctx, "Hostname", r.cfg.Hostname)
ctx = context.WithValue(ctx, "ActionID", req.ActionID)
ctx = context.WithValue(ctx, "actionID", req.ActionID)
r.log.Info("stream request", "jobID", req.ActionID)
a := []*actions.Action{
r.actions.PrepareAction(),
r.actions.StreamAction(),
Expand Down
2 changes: 1 addition & 1 deletion web/template/admin/admin_tabs/runner.gohtml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<span class="bg-red-500 w-20 text-gray-100 py-1 px-2 rounded-full text-sm font-bold text-center">Dead</span>{{end}}
</td>
<td class="px-6">{{$runner.Uptime}}</td>
<td class="px-6">{{$runner.Action}}</td>
<td class="px-6"></td>
<td x-data class="px-6">
<button @click="admin.getFailedAction()"
class="text-5 hover:text-1 items-center justify-center"
Expand Down

0 comments on commit fb70635

Please sign in to comment.