Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runner): heartbeat #1493

Merged
merged 18 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .idea/codeStyles/codeStyleConfig.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/protoeditor.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions dao/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (

type RunnerDao interface {
// Get Runner by ID
Get(context.Context, uint) (model.Runner, error)
Get(context.Context, string) (model.Runner, error)

// Create a new Runner for the database
Create(context.Context, *model.Runner) error

// Delete a Runner by hostname.
Delete(context.Context, string) error

// Update a Runner by hostname.
Update(context.Context, *model.Runner) error
}

type runnerDao struct {
Expand All @@ -31,8 +34,8 @@ func NewRunnerDao() RunnerDao {
}

// Get a Runner by id.
func (d runnerDao) Get(c context.Context, id uint) (res model.Runner, err error) {
return res, d.db.WithContext(c).First(&res, id).Error
func (d runnerDao) Get(c context.Context, hostname string) (res model.Runner, err error) {
return res, d.db.WithContext(c).First(&res, "hostname = ?", hostname).Error
}

// Create a Runner.
Expand All @@ -47,3 +50,8 @@ func (d runnerDao) Create(c context.Context, it *model.Runner) error {
func (d runnerDao) Delete(c context.Context, hostname string) error {
return d.db.WithContext(c).Where("hostname = ?", hostname).Delete(&model.Runner{}).Error
}

// Update a Runner
func (d runnerDao) Update(c context.Context, it *model.Runner) error {
return d.db.WithContext(c).Save(it).Error
}
30 changes: 25 additions & 5 deletions mock_dao/runner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 21 additions & 4 deletions model/runner.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
package model

import "gorm.io/gorm"
import (
"time"

"gorm.io/gorm"
)

// Runner represents a runner handling streams, converting videos,
// extracting silence from audios, creating thumbnails, etc.
type Runner struct {
// Hostname is the hostname of the runner
Hostname string `gorm:"column:hostname;primaryKey;unique;not null"`
Port uint32 `gorm:"column:port;not null"`
// Port is the port, the runners gRPC server listens on.
Port uint32 `gorm:"column:port;not null"`
// LastSeen is the timestamp of the last successful heartbeat.
// if the runner wasn't seen in more than 5 seconds, it's considered dead
// and won't be assigned further jobs.
LastSeen time.Time `gorm:"column:last_seen;"`
// Draining is true if the runner is shutting down.
// In this case, no further jobs will be assigned.
Draining bool `gorm:"column:draining;not null;default:false"`
// JobCount is the number of currently running jobs.
// It's updated through heartbeats and used to select
// the runner with the least workload for new jobs.
JobCount uint64 `gorm:"column:job_count;not null;default:0"`
}

// TableName returns the name of the table for the Runner model in the database.
Expand All @@ -15,8 +32,8 @@ func (*Runner) TableName() string {
}

// BeforeCreate returns an error, if Runner r is invalid
func (r *Runner) BeforeCreate(tx *gorm.DB) (err error) {
// this method currently is a noop
func (r *Runner) BeforeCreate(tx *gorm.DB) error {
r.LastSeen = time.Now()
return nil
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/runner_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"

"github.com/TUM-Dev/gocast/dao"
"github.com/TUM-Dev/gocast/model"
Expand Down Expand Up @@ -86,3 +88,24 @@ func (m *Manager) Register(ctx context.Context, req *protobuf.RegisterRequest) (
}
return &protobuf.RegisterResponse{}, nil
}

func (m *Manager) Notify(ctx context.Context, notification *protobuf.Notification) (*protobuf.NotificationResponse, error) {
switch notification.Data.(type) {
case *protobuf.Notification_Heartbeat:
log.Debug("Heartbeat", "d", notification)
runner, err := m.dao.RunnerDao.Get(ctx, notification.GetHeartbeat().GetHostname())
if err != nil {
return nil, status.Errorf(codes.NotFound, "runner not found: %v", err)
}
runner.LastSeen = time.Now()
runner.Draining = notification.GetHeartbeat().GetDraining()
runner.JobCount = notification.GetHeartbeat().GetJobCount()
err = m.dao.RunnerDao.Update(ctx, &runner)
if err != nil {
return nil, status.Errorf(codes.Internal, "update runner: %v", err)
}
return &protobuf.NotificationResponse{}, nil
default:
return nil, status.Error(codes.Unimplemented, "unsupported notification type")
}
}
13 changes: 9 additions & 4 deletions runner/cmd/runner/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main

import (
"github.com/tum-dev/gocast/runner"
"log/slog"
"os"
"os/signal"
"syscall"
"time"

"github.com/tum-dev/gocast/runner"
)

// V (Version) is bundled into binary with -ldflags "-X ..."
Expand All @@ -21,12 +23,13 @@ func main() {
shouldShutdown := false // set to true once we receive a shutdown signal

currentCount := 0

go func() {
for {
currentCount += <-r.JobCount // count Job start/stop
slog.Info("current action count", "count", currentCount)
slog.Info("current job count", "count", currentCount)
if shouldShutdown && currentCount == 0 { // if we should shut down and no jobs are running, exit.
slog.Info("No actions left, shutting down")
slog.Info("No jobs left, shutting down")
os.Exit(0)
}
}
Expand All @@ -39,12 +42,14 @@ func main() {
shouldShutdown = true
r.Drain()

//let drainage propagate
time.Sleep(time.Second * 10)

if currentCount == 0 {
slog.Info("No jobs left, shutting down")
os.Exit(0)
}

blocking := make(chan struct{})
_ = <-blocking

}
1 change: 1 addition & 0 deletions runner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions runner/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas=
github.com/shirou/gopsutil/v3 v3.24.1 h1:R3t6ondCEvmARp3wxODhXMTLC/klMa87h2PHUw5m7QI=
github.com/shirou/gopsutil/v3 v3.24.1/go.mod h1:UU7a2MSBQa+kW1uuDq8DeEBS8kmrnQwsv2b5O513rwU=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
Expand Down
11 changes: 11 additions & 0 deletions runner/notifications.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ message Notification {
oneof data {
StreamStartNotification stream_start = 1;
StreamEndNotification stream_end = 2;
HeartbeatNotification heartbeat = 3;
}
}

Expand All @@ -21,3 +22,13 @@ message StreamStartNotification {
message StreamEndNotification {
StreamInfo stream = 1;
}

message HeartbeatNotification {
string hostname = 1;
bool draining = 2;
uint64 job_count = 3;
}

message NotificationResponse {

}
Loading
Loading