Skip to content

Commit

Permalink
Simplify worker runner
Browse files Browse the repository at this point in the history
Use `temporalsdk_worker.Run()` blocking call with a system interrupt
listening channel to simplify the `workercmd` and `main` (cli) code.
  • Loading branch information
djjuhasz committed Apr 24, 2024
1 parent ec3de02 commit d70dd61
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 41 deletions.
17 changes: 1 addition & 16 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"runtime"

"github.com/spf13/pflag"
Expand Down Expand Up @@ -57,23 +55,10 @@ func main() {
logger.Info("Configuration file not found.")
}

ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() { <-c; cancel() }()

m := workercmd.NewMain(logger, cfg)

if err := m.Run(ctx); err != nil {
_ = m.Close()
os.Exit(1)
}

<-ctx.Done()

if err := m.Close(); err != nil {
if err := m.Run(); err != nil {

Check warning on line 60 in cmd/worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/main.go#L60

Added line #L60 was not covered by tests
fmt.Fprintln(os.Stderr, err)
logger.Error(err, "Failed to close the application.")
os.Exit(1)
}
}
33 changes: 8 additions & 25 deletions cmd/worker/workercmd/cmd.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package workercmd

import (
"context"

"github.com/artefactual-sdps/temporal-activities/removefiles"
"github.com/go-logr/logr"
"go.artefactual.dev/tools/temporal"
Expand All @@ -16,13 +14,11 @@ import (
"github.com/artefactual-sdps/preprocessing-moma/internal/workflow"
)

const Name = "preprocessing-worker"
const Name = "preprocessing-moma-worker"

type Main struct {
logger logr.Logger
cfg config.Configuration
temporalWorker temporalsdk_worker.Worker
temporalClient temporalsdk_client.Client
logger logr.Logger
cfg config.Configuration
}

func NewMain(logger logr.Logger, cfg config.Configuration) *Main {
Expand All @@ -32,8 +28,8 @@ func NewMain(logger logr.Logger, cfg config.Configuration) *Main {
}
}

func (m *Main) Run(ctx context.Context) error {
c, err := temporalsdk_client.Dial(temporalsdk_client.Options{
func (m *Main) Run() error {
client, err := temporalsdk_client.Dial(temporalsdk_client.Options{

Check warning on line 32 in cmd/worker/workercmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/workercmd/cmd.go#L31-L32

Added lines #L31 - L32 were not covered by tests
HostPort: m.cfg.Temporal.Address,
Namespace: m.cfg.Temporal.Namespace,
Logger: temporal.Logger(m.logger.WithName("temporal")),
Expand All @@ -42,16 +38,15 @@ func (m *Main) Run(ctx context.Context) error {
m.logger.Error(err, "Unable to create Temporal client.")
return err
}
m.temporalClient = c
defer client.Close()

Check warning on line 41 in cmd/worker/workercmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/workercmd/cmd.go#L41

Added line #L41 was not covered by tests

w := temporalsdk_worker.New(m.temporalClient, m.cfg.Temporal.TaskQueue, temporalsdk_worker.Options{
w := temporalsdk_worker.New(client, m.cfg.Temporal.TaskQueue, temporalsdk_worker.Options{

Check warning on line 43 in cmd/worker/workercmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/workercmd/cmd.go#L43

Added line #L43 was not covered by tests
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: m.cfg.Worker.MaxConcurrentSessions,
Interceptors: []temporalsdk_interceptor.WorkerInterceptor{
temporal.NewLoggerInterceptor(m.logger.WithName("worker")),
},
})
m.temporalWorker = w

w.RegisterWorkflowWithOptions(
workflow.NewPreprocessingWorkflow(m.cfg.SharedPath).Execute,
Expand All @@ -62,22 +57,10 @@ func (m *Main) Run(ctx context.Context) error {
temporalsdk_activity.RegisterOptions{Name: removefiles.ActivityName},
)

if err := w.Start(); err != nil {
if err = w.Run(temporalsdk_worker.InterruptCh()); err != nil {

Check warning on line 60 in cmd/worker/workercmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/workercmd/cmd.go#L60

Added line #L60 was not covered by tests
m.logger.Error(err, "Worker failed to start or fatal error during its execution.")
return err
}

return nil
}

func (m *Main) Close() error {
if m.temporalWorker != nil {
m.temporalWorker.Stop()
}

if m.temporalClient != nil {
m.temporalClient.Close()
}

return nil
}

0 comments on commit d70dd61

Please sign in to comment.