From d70dd612dfe0d2aec26712843f99d6dbb000ae66 Mon Sep 17 00:00:00 2001 From: David Juhasz Date: Wed, 24 Apr 2024 12:12:35 -0700 Subject: [PATCH] Simplify worker runner Use `temporalsdk_worker.Run()` blocking call with a system interrupt listening channel to simplify the `workercmd` and `main` (cli) code. --- cmd/worker/main.go | 17 +---------------- cmd/worker/workercmd/cmd.go | 33 ++++++++------------------------- 2 files changed, 9 insertions(+), 41 deletions(-) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 1b81830..b4cb018 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -1,11 +1,9 @@ package main import ( - "context" "flag" "fmt" "os" - "os/signal" "runtime" "github.com/spf13/pflag" @@ -57,23 +55,10 @@ func main() { logger.Info("Configuration file not found.") } - ctx, cancel := context.WithCancel(context.Background()) - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - go func() { <-c; cancel() }() - m := workercmd.NewMain(logger, cfg) - if err := m.Run(ctx); err != nil { - _ = m.Close() - os.Exit(1) - } - - <-ctx.Done() - - if err := m.Close(); err != nil { + if err := m.Run(); err != nil { fmt.Fprintln(os.Stderr, err) - logger.Error(err, "Failed to close the application.") os.Exit(1) } } diff --git a/cmd/worker/workercmd/cmd.go b/cmd/worker/workercmd/cmd.go index a9939f6..3db8885 100644 --- a/cmd/worker/workercmd/cmd.go +++ b/cmd/worker/workercmd/cmd.go @@ -1,8 +1,6 @@ package workercmd import ( - "context" - "github.com/artefactual-sdps/temporal-activities/removefiles" "github.com/go-logr/logr" "go.artefactual.dev/tools/temporal" @@ -16,13 +14,11 @@ import ( "github.com/artefactual-sdps/preprocessing-moma/internal/workflow" ) -const Name = "preprocessing-worker" +const Name = "preprocessing-moma-worker" type Main struct { - logger logr.Logger - cfg config.Configuration - temporalWorker temporalsdk_worker.Worker - temporalClient temporalsdk_client.Client + logger logr.Logger + cfg config.Configuration } func NewMain(logger logr.Logger, cfg config.Configuration) *Main { @@ -32,8 +28,8 @@ func NewMain(logger logr.Logger, cfg config.Configuration) *Main { } } -func (m *Main) Run(ctx context.Context) error { - c, err := temporalsdk_client.Dial(temporalsdk_client.Options{ +func (m *Main) Run() error { + client, err := temporalsdk_client.Dial(temporalsdk_client.Options{ HostPort: m.cfg.Temporal.Address, Namespace: m.cfg.Temporal.Namespace, Logger: temporal.Logger(m.logger.WithName("temporal")), @@ -42,16 +38,15 @@ func (m *Main) Run(ctx context.Context) error { m.logger.Error(err, "Unable to create Temporal client.") return err } - m.temporalClient = c + defer client.Close() - w := temporalsdk_worker.New(m.temporalClient, m.cfg.Temporal.TaskQueue, temporalsdk_worker.Options{ + w := temporalsdk_worker.New(client, m.cfg.Temporal.TaskQueue, temporalsdk_worker.Options{ EnableSessionWorker: true, MaxConcurrentSessionExecutionSize: m.cfg.Worker.MaxConcurrentSessions, Interceptors: []temporalsdk_interceptor.WorkerInterceptor{ temporal.NewLoggerInterceptor(m.logger.WithName("worker")), }, }) - m.temporalWorker = w w.RegisterWorkflowWithOptions( workflow.NewPreprocessingWorkflow(m.cfg.SharedPath).Execute, @@ -62,22 +57,10 @@ func (m *Main) Run(ctx context.Context) error { temporalsdk_activity.RegisterOptions{Name: removefiles.ActivityName}, ) - if err := w.Start(); err != nil { + if err = w.Run(temporalsdk_worker.InterruptCh()); err != nil { m.logger.Error(err, "Worker failed to start or fatal error during its execution.") return err } return nil } - -func (m *Main) Close() error { - if m.temporalWorker != nil { - m.temporalWorker.Stop() - } - - if m.temporalClient != nil { - m.temporalClient.Close() - } - - return nil -}