Skip to content

Commit

Permalink
Merge pull request #916 from openmeterio/revert-sink-worker-changes
Browse files Browse the repository at this point in the history
revert: sink worker changes
  • Loading branch information
chrisgacsal authored May 17, 2024
2 parents 2384ad1 + 343d6f2 commit 2ca3945
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 139 deletions.
98 changes: 33 additions & 65 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"os"
"syscall"
Expand Down Expand Up @@ -39,14 +38,11 @@ import (
"github.com/openmeterio/openmeter/pkg/slicesx"
)

const (
defaultShutdownTimeout = 5 * time.Second
)

var otelName string = "openmeter.io/sink-worker"

func main() {
v, flags := viper.New(), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError)
ctx := context.Background()

config.Configure(v, flags)

Expand Down Expand Up @@ -81,12 +77,8 @@ func main() {
panic(err)
}

// Setup main context covering the application lifecycle and ensure that the context is canceled on process exit.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

extraResources, _ := resource.New(
ctx,
context.Background(),
resource.WithContainer(),
resource.WithAttributes(
semconv.ServiceName("openmeter-sink-worker"),
Expand All @@ -108,18 +100,13 @@ func main() {
telemetryRouter.Mount("/debug", middleware.Profiler())

// Initialize OTel Metrics
otelMeterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(ctx, res)
otelMeterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(context.Background(), res)
if err != nil {
logger.Error("failed to initialize OpenTelemetry Metrics provider", slog.String("error", err.Error()))
logger.Error(err.Error())
os.Exit(1)
}
defer func() {
// Use dedicated context with timeout for shutdown as parent context might be canceled
// by the time the execution reaches this stage.
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

if err := otelMeterProvider.Shutdown(ctx); err != nil {
if err := otelMeterProvider.Shutdown(context.Background()); err != nil {
logger.Error("shutting down meter provider: %v", err)
}
}()
Expand All @@ -131,18 +118,13 @@ func main() {
}

// Initialize OTel Tracer
otelTracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(ctx, res)
otelTracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(context.Background(), res)
if err != nil {
logger.Error("failed to initialize OpenTelemetry Trace provider", slog.String("error", err.Error()))
logger.Error(err.Error())
os.Exit(1)
}
defer func() {
// Use dedicated context with timeout for shutdown as parent context might be canceled
// by the time the execution reaches this stage.
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

if err := otelTracerProvider.Shutdown(ctx); err != nil {
if err := otelTracerProvider.Shutdown(context.Background()); err != nil {
logger.Error("shutting down tracer provider", "error", err)
}
}()
Expand All @@ -165,11 +147,8 @@ func main() {
}

logger.Info("starting OpenMeter sink worker", "config", map[string]string{
"telemetry.address": conf.Telemetry.Address,
"ingest.kafka.broker": conf.Ingest.Kafka.Broker,
"ingest.kafka.brokerAddressFamily": conf.Ingest.Kafka.BrokerAddressFamily,
"sink.clientId": conf.Sink.ClientId,
"sink.groupId": conf.Sink.GroupId,
"telemetry.address": conf.Telemetry.Address,
"ingest.kafka.broker": conf.Ingest.Kafka.Broker,
})

// Initialize meter repository
Expand All @@ -183,51 +162,42 @@ func main() {
logger.Error("failed to initialize sink worker", "error", err)
os.Exit(1)
}
defer sink.Close()

var group run.Group

// Set up telemetry server
server := &http.Server{
Addr: conf.Telemetry.Address,
Handler: telemetryRouter,
BaseContext: func(_ net.Listener) context.Context {
return ctx
},
{
server := &http.Server{
Addr: conf.Telemetry.Address,
Handler: telemetryRouter,
}
defer server.Close()

group.Add(
func() error { return server.ListenAndServe() },
func(err error) { _ = server.Shutdown(ctx) },
)
}
defer server.Close()

var group run.Group
// Add sink worker to run group
group.Add(
func() error { return sink.Run(ctx) },
func(err error) { sink.Close() },
)
// Starting sink worker
{
defer sink.Close()

// Add telemetry server to run group
group.Add(
func() error { return server.ListenAndServe() },
func(err error) { _ = server.Shutdown(ctx) },
)
group.Add(
func() error { return sink.Run() },
func(err error) { _ = sink.Close() },
)
}

// Setup signal handler
group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM))

// Run actors
err = group.Run()

var exitCode int
if e := (run.SignalError{}); errors.As(err, &e) {
logger.Info("received signal: shutting down", slog.String("signal", e.Signal.String()))
switch e.Signal {
case syscall.SIGTERM:
default:
exitCode = 130
}
slog.Info("received signal; shutting down", slog.String("signal", e.Signal.String()))
} else if !errors.Is(err, http.ErrServerClosed) {
logger.Error("application stopped due to error", slog.String("error", err.Error()))
exitCode = 1
}

os.Exit(exitCode)
}

func initClickHouseClient(config config.Configuration) (clickhouse.Conn, error) {
Expand Down Expand Up @@ -282,10 +252,8 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr
)

consumerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()
_ = consumerKafkaConfig.SetKey("client.id", config.Sink.ClientId)
_ = consumerKafkaConfig.SetKey("group.id", config.Sink.GroupId)
// SessionTimeout defines time interval the broker waits for receiving heartbeat from the consumer before removing it from the consumer group.
_ = consumerKafkaConfig.SetKey("session.timeout.ms", int(config.Sink.SessionTimeout.Milliseconds()))
_ = consumerKafkaConfig.SetKey("session.timeout.ms", 6000)
_ = consumerKafkaConfig.SetKey("enable.auto.commit", true)
_ = consumerKafkaConfig.SetKey("enable.auto.offset.store", false)
_ = consumerKafkaConfig.SetKey("go.application.rebalance.enable", true)
Expand Down
3 changes: 0 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestComplete(t *testing.T) {
SaslPassword: "pass",
Partitions: 1,
EventsTopicTemplate: "om_%s_events",
MetadataMaxAge: 180 * time.Second,
},
},
Aggregation: AggregationConfiguration{
Expand All @@ -96,9 +95,7 @@ func TestComplete(t *testing.T) {
},
},
Sink: SinkConfiguration{
ClientId: "openmeter-sink-worker",
GroupId: "openmeter-sink-worker",
SessionTimeout: 9 * time.Second,
MinCommitCount: 500,
MaxCommitWait: 30 * time.Second,
NamespaceRefetch: 15 * time.Second,
Expand Down
23 changes: 1 addition & 22 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/spf13/viper"
Expand All @@ -31,14 +30,6 @@ type KafkaIngestConfiguration struct {
SaslPassword string
Partitions int
EventsTopicTemplate string
BrokerAddressFamily string
// SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections
// by Kafka brokers.
SocketKeepAliveEnable bool
// MetadataMaxAge defines the time interval after the metadata cache (brokers, partitions) becomes invalid.
// Keeping this parameter low forces the client to refresh metadata more frequently which helps with
// detecting changes in the Kafka cluster (broker, partition, leader changes) faster.
MetadataMaxAge time.Duration
}

// CreateKafkaConfig creates a Kafka config map.
Expand All @@ -53,9 +44,7 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
// This is needed when using localhost brokers on OSX,
// since the OSX resolver will return the IPv6 addresses first.
// See: https://github.com/openmeterio/openmeter/issues/321
if c.BrokerAddressFamily != "" {
config["broker.address.family"] = c.BrokerAddressFamily
} else if strings.Contains(c.Broker, "localhost") || strings.Contains(c.Broker, "127.0.0.1") {
if strings.Contains(c.Broker, "localhost") || strings.Contains(c.Broker, "127.0.0.1") {
config["broker.address.family"] = "v4"
}

Expand All @@ -75,14 +64,6 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
config["sasl.password"] = c.SaslPassword
}

if c.SocketKeepAliveEnable {
config["socket.keepalive.enable"] = c.SocketKeepAliveEnable
}

if c.MetadataMaxAge != 0 {
config["metadata.max.age.ms"] = c.MetadataMaxAge
}

return config
}

Expand All @@ -108,6 +89,4 @@ func ConfigureIngest(v *viper.Viper) {
v.SetDefault("ingest.kafka.saslPassword", "")
v.SetDefault("ingest.kafka.partitions", 1)
v.SetDefault("ingest.kafka.eventsTopicTemplate", "om_%s_events")
v.SetDefault("ingest.kafka.socketKeepAliveEnable", false)
v.SetDefault("ingest.kafka.metadataMaxAge", 180*time.Second)
}
14 changes: 1 addition & 13 deletions config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ import (
)

type SinkConfiguration struct {
// ClientId defines the client id for the Kafka Consumer
ClientId string
// GroupId defines the consumer group id for the Kafka Consumer
GroupId string
// SessionTimeout defines time interval the broker waits for receiving heartbeat
// from the consumer before removing it from the consumer group.
SessionTimeout time.Duration
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
Expand All @@ -34,10 +28,6 @@ func (c SinkConfiguration) Validate() error {
return errors.New("NamespaceRefetch must be greater than 0")
}

if c.SessionTimeout.Milliseconds() < 3000 {
return errors.New("SessionTimeout must be greater than 3000ms")
}

return nil
}

Expand All @@ -63,9 +53,7 @@ func ConfigureSink(v *viper.Viper) {

// Sink
v.SetDefault("sink.groupId", "openmeter-sink-worker")
v.SetDefault("sink.clientId", "openmeter-sink-worker")
v.SetDefault("sink.minCommitCount", 500)
v.SetDefault("sink.maxCommitWait", "5s")
v.SetDefault("sink.namespaceRefetch", "15s")
v.SetDefault("sink.sessionTimeout", 9*time.Second)
}
Loading

0 comments on commit 2ca3945

Please sign in to comment.