diff --git a/app/common/billing.go b/app/common/billing.go new file mode 100644 index 000000000..b78575c00 --- /dev/null +++ b/app/common/billing.go @@ -0,0 +1,151 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + "syscall" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/wire" + "github.com/oklog/run" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + "github.com/openmeterio/openmeter/openmeter/billing" + billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" + billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" + billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker" + "github.com/openmeterio/openmeter/openmeter/customer" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/streaming" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/openmeter/watermill/router" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.Worker.DLQ.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.Worker.DLQ.Topic, + Partitions: conf.Worker.DLQ.AutoProvision.Partitions, + RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Worker.DLQ.AutoProvision.Retention), + }) + } + + return provisionTopics +} + +// no closer function: the subscriber is closed by the router/worker +func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error) { + subscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{ + Broker: brokerOptions, + ConsumerGroupName: conf.Worker.ConsumerGroupName, + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize Kafka subscriber: %w", err) + } + + return subscriber, nil +} + +func NewBillingWorkerOptions( + eventConfig config.EventsConfiguration, + routerOptions router.Options, + eventBus eventbus.Publisher, + billingService billing.Service, + logger *slog.Logger, +) billingworker.WorkerOptions { + return billingworker.WorkerOptions{ + SystemEventsTopic: eventConfig.SystemEvents.Topic, + + Router: routerOptions, + EventBus: eventBus, + BillingService: billingService, + Logger: logger, + } +} + +func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) { + worker, err := billingworker.New(workerOptions) + if err != nil { + return nil, fmt.Errorf("failed to initialize worker: %w", err) + } + + return worker, nil +} + +func BillingWorkerGroup( + ctx context.Context, + worker *billingworker.Worker, + telemetryServer TelemetryServer, +) run.Group { + var group run.Group + + group.Add( + func() error { return telemetryServer.ListenAndServe() }, + func(err error) { _ = telemetryServer.Shutdown(ctx) }, + ) + + group.Add( + func() error { return worker.Run(ctx) }, + func(err error) { _ = worker.Close() }, + ) + + group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM)) + + return group +} + +func BillingService( + logger *slog.Logger, + db *entdb.Client, + billingConfig config.BillingConfiguration, + customerService customer.Service, + appService app.Service, + featureConnector feature.FeatureConnector, + meterRepo meter.Repository, + streamingConnector streaming.Connector, +) (billing.Service, error) { + if !billingConfig.Enabled { + return nil, nil + } + + adapter, err := billingadapter.New(billingadapter.Config{ + Client: db, + Logger: logger, + }) + if err != nil { + return nil, fmt.Errorf("creating billing adapter: %w", err) + } + + return billingservice.New(billingservice.Config{ + Adapter: adapter, + CustomerService: customerService, + AppService: appService, + Logger: logger, + FeatureService: featureConnector, + MeterRepo: meterRepo, + StreamingConnector: streamingConnector, + }) +} + +var BillingWorker = wire.NewSet( + wire.FieldsOf(new(config.BillingWorkerConfiguration), "ConsumerConfiguration"), + wire.FieldsOf(new(config.BillingConfiguration), "Worker"), + + BillingWorkerProvisionTopics, + BillingWorkerSubscriber, + + NewCustomerService, + BillingService, + + NewBillingWorkerOptions, + NewBillingWorker, + BillingWorkerGroup, +) diff --git a/app/common/customer.go b/app/common/customer.go new file mode 100644 index 000000000..805d0c79a --- /dev/null +++ b/app/common/customer.go @@ -0,0 +1 @@ +package common diff --git a/app/common/openmeter.go b/app/common/openmeter.go index bf60d27ad..2e8860490 100644 --- a/app/common/openmeter.go +++ b/app/common/openmeter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "os" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ThreeDotsLabs/watermill/message" @@ -12,6 +13,10 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/customer" + customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" + customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" @@ -251,3 +256,18 @@ func NewFlushHandler( return flushHandlerMux, nil } + +func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) { + customerAdapter, err := customeradapter.New(customeradapter.Config{ + Client: db, + Logger: logger.WithGroup("customer.postgres"), + }) + if err != nil { + logger.Error("failed to initialize customer repository", "error", err) + os.Exit(1) + } + + return customerservice.New(customerservice.Config{ + Adapter: customerAdapter, + }) +} diff --git a/app/common/wire.go b/app/common/wire.go index 275322793..d00ea879e 100644 --- a/app/common/wire.go +++ b/app/common/wire.go @@ -39,6 +39,7 @@ var Config = wire.NewSet( wire.FieldsOf(new(config.Configuration), "Namespace"), wire.FieldsOf(new(config.Configuration), "Events"), wire.FieldsOf(new(config.Configuration), "BalanceWorker"), + wire.FieldsOf(new(config.Configuration), "Billing"), wire.FieldsOf(new(config.Configuration), "Notification"), wire.FieldsOf(new(config.Configuration), "Sink"), ) diff --git a/app/config/billing.go b/app/config/billing.go index 3ff38cb8e..a15c4e1c9 100644 --- a/app/config/billing.go +++ b/app/config/billing.go @@ -4,6 +4,7 @@ import "github.com/spf13/viper" type BillingConfiguration struct { Enabled bool + Worker BillingWorkerConfiguration } func (c BillingConfiguration) Validate() error { diff --git a/app/config/billingworker.go b/app/config/billingworker.go new file mode 100644 index 000000000..8d01dd3c7 --- /dev/null +++ b/app/config/billingworker.go @@ -0,0 +1,29 @@ +package config + +import ( + "errors" + + "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/pkg/errorsx" +) + +type BillingWorkerConfiguration struct { + ConsumerConfiguration `mapstructure:",squash"` +} + +func (c BillingWorkerConfiguration) Validate() error { + var errs []error + + if err := c.ConsumerConfiguration.Validate(); err != nil { + errs = append(errs, errorsx.WithPrefix(err, "consumer")) + } + + return errors.Join(errs...) +} + +func ConfigureBillingWorker(v *viper.Viper) { + ConfigureConsumer(v, "billingWorker") + v.SetDefault("billingWorker.dlq.topic", "om_sys.billing_worker_dlq") + v.SetDefault("billingWorker.consumerGroupName", "om_billing_worker") +} diff --git a/app/config/config.go b/app/config/config.go index 1861b10c3..172074f66 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -165,6 +165,7 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigurePortal(v) ConfigureEvents(v) ConfigureBalanceWorker(v) + ConfigureBillingWorker(v) ConfigureNotification(v) ConfigureStripe(v) ConfigureBilling(v) diff --git a/cmd/billing-worker/.air.toml b/cmd/billing-worker/.air.toml new file mode 100644 index 000000000..cb007a3db --- /dev/null +++ b/cmd/billing-worker/.air.toml @@ -0,0 +1,44 @@ +root = "." +testdata_dir = "testdata" +tmp_dir = "tmp" + +[build] + args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10002"] + bin = "./tmp/openmeter-balance-worker" + cmd = "go build -tags dynamic -o ./tmp/openmeter-balance-worker ./cmd/balance-worker" + delay = 0 + exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"] + exclude_file = [] + exclude_regex = ["_test.go"] + exclude_unchanged = false + follow_symlink = false + full_bin = "" + include_dir = [] + include_ext = ["go", "tpl", "tmpl", "html", "yml", "yaml", "sql", "json"] + include_file = [] + kill_delay = "0s" + log = "build-errors.log" + poll = false + poll_interval = 0 + rerun = false + rerun_delay = 500 + send_interrupt = false + stop_on_error = false + +[color] + app = "" + build = "yellow" + main = "magenta" + runner = "green" + watcher = "cyan" + +[log] + main_only = false + time = false + +[misc] + clean_on_exit = false + +[screen] + clear_on_rebuild = false + keep_scroll = true diff --git a/cmd/billing-worker/main.go b/cmd/billing-worker/main.go new file mode 100644 index 000000000..6ac4dc6c9 --- /dev/null +++ b/cmd/billing-worker/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/app/config" +) + +func main() { + v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError) + ctx := context.Background() + + config.SetViperDefaults(v, flags) + + flags.String("config", "", "Configuration file") + flags.Bool("version", false, "Show version information") + flags.Bool("validate", false, "Validate configuration and exit") + + _ = flags.Parse(os.Args[1:]) + + if v, _ := flags.GetBool("version"); v { + fmt.Printf("%s version %s (%s) built on %s\n", "Open Meter", version, revision, revisionDate) + + os.Exit(0) + } + + if c, _ := flags.GetString("config"); c != "" { + v.SetConfigFile(c) + } + + err := v.ReadInConfig() + if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { + panic(err) + } + + var conf config.Configuration + err = v.Unmarshal(&conf) + if err != nil { + panic(err) + } + + err = conf.Validate() + if err != nil { + println("configuration error:") + println(err.Error()) + os.Exit(1) + } + + if v, _ := flags.GetBool("validate"); v { + os.Exit(0) + } + + app, cleanup, err := initializeApplication(ctx, conf) + if err != nil { + slog.Error("failed to initialize application", "error", err) + + cleanup() + + os.Exit(1) + } + defer cleanup() + + app.SetGlobals() + + logger := app.Logger + + // Validate service prerequisites + + if !conf.Events.Enabled { + logger.Error("events are disabled, exiting") + os.Exit(1) + } + + if err := app.Migrate(ctx); err != nil { + logger.Error("failed to initialize database", "error", err) + os.Exit(1) + } + + app.Run() +} diff --git a/cmd/billing-worker/version.go b/cmd/billing-worker/version.go new file mode 100644 index 000000000..24582ea13 --- /dev/null +++ b/cmd/billing-worker/version.go @@ -0,0 +1,34 @@ +package main + +import "runtime/debug" + +// Provisioned by ldflags. +var version string + +//nolint:gochecknoglobals +var ( + revision string + revisionDate string +) + +//nolint:gochecknoinits,goconst +func init() { + if version == "" { + version = "unknown" + } + + buildInfo, _ := debug.ReadBuildInfo() + + revision = "unknown" + revisionDate = "unknown" + + for _, setting := range buildInfo.Settings { + if setting.Key == "vcs.revision" { + revision = setting.Value + } + + if setting.Key == "vcs.time" { + revisionDate = setting.Value + } + } +} diff --git a/cmd/billing-worker/wire.go b/cmd/billing-worker/wire.go new file mode 100644 index 000000000..2cdf3d3e2 --- /dev/null +++ b/cmd/billing-worker/wire.go @@ -0,0 +1,45 @@ +//go:build wireinject +// +build wireinject + +package main + +import ( + "context" + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" +) + +type Application struct { + common.GlobalInitializer + common.Migrator + common.Runner + + Logger *slog.Logger +} + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + wire.Build( + metadata, + common.Config, + common.Framework, + common.Telemetry, + common.NewDefaultTextMapPropagator, + common.Database, + common.ClickHouse, + common.KafkaTopic, + common.Watermill, + common.WatermillRouter, + common.OpenMeter, + common.BillingWorker, + wire.Struct(new(Application), "*"), + ) + return Application{}, nil, nil +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "billing-worker") +} diff --git a/cmd/billing-worker/wire_gen.go b/cmd/billing-worker/wire_gen.go new file mode 100644 index 000000000..87c05c09e --- /dev/null +++ b/cmd/billing-worker/wire_gen.go @@ -0,0 +1,218 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run -mod=mod github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package main + +import ( + "context" + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/registry/builder" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/router" + "log/slog" +) + +// Injectors from wire.go: + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + telemetryConfig := conf.Telemetry + logTelemetryConfig := telemetryConfig.Log + commonMetadata := metadata(conf) + resource := common.NewTelemetryResource(commonMetadata) + loggerProvider, cleanup, err := common.NewLoggerProvider(ctx, logTelemetryConfig, resource) + if err != nil { + return Application{}, nil, err + } + logger := common.NewLogger(logTelemetryConfig, resource, loggerProvider, commonMetadata) + metricsTelemetryConfig := telemetryConfig.Metrics + meterProvider, cleanup2, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) + if err != nil { + cleanup() + return Application{}, nil, err + } + traceTelemetryConfig := telemetryConfig.Trace + tracerProvider, cleanup3, err := common.NewTracerProvider(ctx, traceTelemetryConfig, resource, logger) + if err != nil { + cleanup2() + cleanup() + return Application{}, nil, err + } + textMapPropagator := common.NewDefaultTextMapPropagator() + globalInitializer := common.GlobalInitializer{ + Logger: logger, + MeterProvider: meterProvider, + TracerProvider: tracerProvider, + TextMapPropagator: textMapPropagator, + } + postgresConfig := conf.Postgres + meter := common.NewMeter(meterProvider, commonMetadata) + driver, cleanup4, err := common.NewPostgresDriver(ctx, postgresConfig, meterProvider, meter, tracerProvider, logger) + if err != nil { + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + db := common.NewDB(driver) + entPostgresDriver, cleanup5 := common.NewEntPostgresDriver(db, logger) + client := common.NewEntClient(entPostgresDriver) + migrator := common.Migrator{ + Config: postgresConfig, + Client: client, + Logger: logger, + } + eventsConfiguration := conf.Events + balanceWorkerConfiguration := conf.BalanceWorker + ingestConfiguration := conf.Ingest + kafkaIngestConfiguration := ingestConfiguration.Kafka + kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration + brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + subscriber, err := common.BalanceWorkerSubscriber(balanceWorkerConfiguration, brokerOptions) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v := common.BalanceWorkerProvisionTopics(balanceWorkerConfiguration) + adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + topicProvisionerConfig := kafkaIngestConfiguration.TopicProvisionerConfig + kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) + topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + publisherOptions := kafka.PublisherOptions{ + Broker: brokerOptions, + ProvisionTopics: v, + TopicProvisioner: topicProvisioner, + } + publisher, cleanup6, err := common.NewPublisher(ctx, publisherOptions, logger) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + consumerConfiguration := balanceWorkerConfiguration.ConsumerConfiguration + options := router.Options{ + Subscriber: subscriber, + Publisher: publisher, + Logger: logger, + MetricMeter: meter, + Config: consumerConfiguration, + } + eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + aggregationConfiguration := conf.Aggregation + clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse + v2, err := common.NewClickHouse(clickHouseAggregationConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v3 := conf.Meters + inMemoryRepository := common.NewMeterRepository(v3) + connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v2, inMemoryRepository, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + entitlementOptions := registrybuilder.EntitlementOptions{ + DatabaseClient: client, + StreamingConnector: connector, + Logger: logger, + MeterRepository: inMemoryRepository, + Publisher: eventbusPublisher, + } + entitlement := registrybuilder.GetEntitlementRegistry(entitlementOptions) + entitlementRepo := common.NewEntitlementRepo(client) + subjectResolver := common.SubjectResolver() + workerOptions := common.NewBalanceWorkerOptions(eventsConfiguration, options, eventbusPublisher, entitlement, entitlementRepo, subjectResolver, logger) + worker, err := common.NewBalanceWorker(workerOptions) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + health := common.NewHealthChecker(logger) + telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) + v4, cleanup7 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) + group := common.BalanceWorkerGroup(ctx, worker, v4) + runner := common.Runner{ + Group: group, + Logger: logger, + } + application := Application{ + GlobalInitializer: globalInitializer, + Migrator: migrator, + Runner: runner, + Logger: logger, + } + return application, func() { + cleanup7() + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + }, nil +} + +// wire.go: + +type Application struct { + common.GlobalInitializer + common.Migrator + common.Runner + + Logger *slog.Logger +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "balance-worker") +} diff --git a/openmeter/billing/worker/worker.go b/openmeter/billing/worker/worker.go new file mode 100644 index 000000000..c9876455b --- /dev/null +++ b/openmeter/billing/worker/worker.go @@ -0,0 +1,119 @@ +package billingworker + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/openmeter/watermill/grouphandler" + "github.com/openmeterio/openmeter/openmeter/watermill/router" +) + +type WorkerOptions struct { + SystemEventsTopic string + + Router router.Options + EventBus eventbus.Publisher + + Logger *slog.Logger + + BillingService billing.Service + // External connectors +} + +func (w WorkerOptions) Validate() error { + if w.SystemEventsTopic == "" { + return fmt.Errorf("system events topic is required") + } + + if err := w.Router.Validate(); err != nil { + return fmt.Errorf("router: %w", err) + } + + if w.EventBus == nil { + return fmt.Errorf("event bus is required") + } + + if w.Logger == nil { + return fmt.Errorf("logger is required") + } + + if w.BillingService == nil { + return fmt.Errorf("billing service is required") + } + + return nil +} + +type Worker struct { + router *message.Router + + billingService billing.Service +} + +func New(opts WorkerOptions) (*Worker, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + + worker := &Worker{ + billingService: opts.BillingService, + } + + router, err := router.NewDefaultRouter(opts.Router) + if err != nil { + return nil, err + } + + worker.router = router + + eventHandler, err := worker.eventHandler(opts) + if err != nil { + return nil, err + } + + router.AddNoPublisherHandler( + "billing_worker_system_events", + opts.SystemEventsTopic, + opts.Router.Subscriber, + eventHandler, + ) + + return worker, nil +} + +func (w *Worker) eventHandler(opts WorkerOptions) (message.NoPublishHandlerFunc, error) { + return grouphandler.NewNoPublishingHandler( + opts.EventBus.Marshaler(), + opts.Router.MetricMeter, + + /* TODO: + + // Entitlement created event + grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error { + return w.opts.EventBus. + WithContext(ctx). + PublishIfNoError(w.handleEntitlementEvent( + ctx, + NamespacedID{Namespace: event.Namespace.ID, ID: event.ID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID), + )) + }), */ + ) +} + +func (w *Worker) Run(ctx context.Context) error { + return w.router.Run(ctx) +} + +func (w *Worker) Close() error { + if err := w.router.Close(); err != nil { + return err + } + + return nil +}