Skip to content

Commit

Permalink
v2 - Named sub logger - logger refactoring (#1124)
Browse files Browse the repository at this point in the history
This PR adds the ability to inject a named sublogger into the component.
The Logger interface is part of V2 API.


There are some TODOs, that will be done in the subsequent PRs.
- initialize logger
- global logger removal (if necessary)
- use component names from the config
  • Loading branch information
nablaone authored Dec 20, 2024
1 parent 1a78cdf commit dc539c6
Show file tree
Hide file tree
Showing 21 changed files with 355 additions and 82 deletions.
4 changes: 4 additions & 0 deletions quesma/backend_connectors/clickhouse_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ func NewClickHouseBackendConnector(endpoint string) *ClickHouseBackendConnector
Endpoint: endpoint,
}
}

func (p *ClickHouseBackendConnector) InstanceName() string {
return "clickhouse" // TODO add name taken from config
}
4 changes: 4 additions & 0 deletions quesma/backend_connectors/elasticsearch_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func NewElasticsearchBackendConnector(cfg config.ElasticsearchConfiguration) *El
return conn
}

func (e *ElasticsearchBackendConnector) InstanceName() string {
return "elasticsearch" // TODO return name from config
}

func (e *ElasticsearchBackendConnector) GetConfig() config.ElasticsearchConfiguration {
return e.config
}
Expand Down
4 changes: 4 additions & 0 deletions quesma/backend_connectors/mysql_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type MySqlBackendConnector struct {
connection *sql.DB
}

func (p *MySqlBackendConnector) InstanceName() string {
return "mysql"
}

func (p *MySqlBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.MySQLBackend
}
Expand Down
11 changes: 9 additions & 2 deletions quesma/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type BasicHTTPFrontendConnector struct {

phoneHomeClient diag.PhoneHomeClient
debugInfoCollector diag.DebugInfoCollector
logger quesma_api.QuesmaLogger
}

func (h *BasicHTTPFrontendConnector) GetChildComponents() []interface{} {
Expand All @@ -47,6 +48,8 @@ func (h *BasicHTTPFrontendConnector) GetChildComponents() []interface{} {
func (h *BasicHTTPFrontendConnector) SetDependencies(deps quesma_api.Dependencies) {
h.phoneHomeClient = deps.PhoneHomeAgent()
h.debugInfoCollector = deps.DebugInfoCollector()
h.logger = deps.Logger()

deps.PhoneHomeAgent().FailedRequestsCollector(func() int64 {
return h.routerInstance.FailedRequests.Load()
})
Expand All @@ -66,6 +69,10 @@ func NewBasicHTTPFrontendConnector(endpoint string, config *config.QuesmaConfigu
}
}

func (h *BasicHTTPFrontendConnector) InstanceName() string {
return "BasicHTTPFrontendConnector" // TODO return name from config
}

func (h *BasicHTTPFrontendConnector) AddRouter(router quesma_api.Router) {
h.router = router
}
Expand Down Expand Up @@ -100,9 +107,9 @@ func (h *BasicHTTPFrontendConnector) Listen() error {
h.listener.Addr = h.endpoint
h.listener.Handler = h
go func() {
h.logger.Info().Msgf("HTTP server started on %s", h.endpoint)
err := h.listener.ListenAndServe()
// TODO: Handle error
_ = err
h.logger.Error().Err(err).Msg("HTTP server stopped")
}()

return nil
Expand Down
83 changes: 33 additions & 50 deletions quesma/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"os"
"quesma/stats/errorstats"
asyncQueryTracing "quesma/tracing"
tracing "quesma_v2/core/tracing"
quesma_v2 "quesma_v2/core"

"time"
)

Expand Down Expand Up @@ -39,8 +40,6 @@ const (
bufferSizeChannel = 1024
)

var logger zerolog.Logger

// InitLogger returns channel where log messages will be sent
func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asyncQueryTraceLogger *asyncQueryTracing.AsyncTraceLogger) <-chan LogWithLevel {
zerolog.TimeFieldFormat = time.RFC3339Nano // without this we don't have milliseconds timestamp precision
Expand Down Expand Up @@ -89,20 +88,23 @@ func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asy
}

multi := zerolog.MultiLevelWriter(logWriters...)
logger = zerolog.New(multi).
l := zerolog.New(multi).
Level(cfg.Level).
With().
Timestamp().
Caller().
Logger()

globalError := errorstats.GlobalErrorHook{}
logger = logger.Hook(&globalError)
l = l.Hook(&globalError)
if asyncQueryTraceLogger != nil {
logger = logger.Hook(asyncQueryTraceLogger)
l = l.Hook(asyncQueryTraceLogger)
}

logger.Info().Msgf("Logger initialized with level %s", cfg.Level)
l.Info().Msgf("Logger initialized with level %s", cfg.Level)

logger = quesma_v2.NewQuesmaLogger(l)

return logChannel
}

Expand All @@ -111,7 +113,7 @@ func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asy
// of the test, and calls to the global logger will start appearing in the console.
// Without it, they don't.
func InitSimpleLoggerForTests() {
logger = zerolog.New(
l := zerolog.New(
zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.StampMilli,
Expand All @@ -120,14 +122,16 @@ func InitSimpleLoggerForTests() {
With().
Timestamp().
Logger()

logger = quesma_v2.NewQuesmaLogger(l)
}

// InitSimpleLoggerForTestsWarnLevel initializes our global logger (level >= Warn) to the console output.
// Useful e.g. in debugging failing tests: you can call this function at the beginning
// of the test, and calls to the global logger will start appearing in the console.
// Without it, they don't.
func InitSimpleLoggerForTestsWarnLevel() {
logger = zerolog.New(
l := zerolog.New(
zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.StampMilli,
Expand All @@ -136,6 +140,8 @@ func InitSimpleLoggerForTestsWarnLevel() {
With().
Timestamp().
Logger()

logger = quesma_v2.NewQuesmaLogger(l)
}

var testLoggerInitialized bool
Expand All @@ -157,15 +163,17 @@ func InitOnlyChannelLoggerForTests() <-chan LogWithLevel {
logChannel := make(chan LogWithLevel, 50000) // small number like 5 or 10 made entire Quesma totally unresponsive during the few seconds where Kibana spams with messages
chanWriter := channelWriter{ch: logChannel}

logger = zerolog.New(chanWriter).
l := zerolog.New(chanWriter).
Level(zerolog.DebugLevel).
With().
Timestamp().
Caller().
Logger()

globalError := errorstats.GlobalErrorHook{}
logger = logger.Hook(&globalError)
l = l.Hook(&globalError)

logger = quesma_v2.NewQuesmaLogger(l)

testLoggerInitialized = true
return logChannel
Expand All @@ -191,86 +199,61 @@ func openLogFiles(logsPath string) {
}
}

func addKnownContextValues(event *zerolog.Event, ctx context.Context) *zerolog.Event {
// --- legacy API

if requestId, ok := ctx.Value(tracing.RequestIdCtxKey).(string); ok {
event = event.Str(RID, requestId)
}
if path, ok := ctx.Value(tracing.RequestPath).(string); ok {
event = event.Str(Path, path)
}
if reason, ok := ctx.Value(tracing.ReasonCtxKey).(string); ok {
event = event.Str(Reason, reason)
}
if asyncId, ok := ctx.Value(tracing.AsyncIdCtxKey).(string); ok {
if asyncId != "" {
event = event.Str(AsyncId, asyncId)
}
}
// global logger, TODO this should be removed
var logger = quesma_v2.EmptyQuesmaLogger()

if requestId, ok := ctx.Value(tracing.OpaqueIdCtxKey).(string); ok {
event = event.Str(OpaqueId, requestId)
}

return event
func GlobalLogger() quesma_v2.QuesmaLogger {
return logger
}

// global logger delegates

func Debug() *zerolog.Event {
return logger.Debug()
}

func DebugWithCtx(ctx context.Context) *zerolog.Event {
event := logger.Debug().Ctx(ctx)
event = addKnownContextValues(event, ctx)
return event
return logger.DebugWithCtx(ctx)
}

func Info() *zerolog.Event {
return logger.Info()
}

func InfoWithCtx(ctx context.Context) *zerolog.Event {
event := logger.Info().Ctx(ctx)
event = addKnownContextValues(event, ctx)
return event
return logger.InfoWithCtx(ctx)
}

// MarkTraceEndWithCtx marks the end of a trace with the given context.
// Calling this functions at end of a trace is crucial from the transactional logging perspective.
func MarkTraceEndWithCtx(ctx context.Context) *zerolog.Event {
event := logger.Info().Ctx(ctx)
event = addKnownContextValues(event, ctx)
ctx = context.WithValue(ctx, tracing.TraceEndCtxKey, true)
event = event.Ctx(ctx)
return event
return logger.MarkTraceEndWithCtx(ctx)
}

func Warn() *zerolog.Event {
return logger.Warn()
}

func WarnWithCtx(ctx context.Context) *zerolog.Event {
event := logger.Warn().Ctx(ctx)
event = addKnownContextValues(event, ctx)
return event
return logger.WarnWithCtx(ctx)
}

func WarnWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event {
return WarnWithCtx(context.WithValue(ctx, tracing.ReasonCtxKey, reason))
return logger.WarnWithCtxAndReason(ctx, reason)
}

func Error() *zerolog.Event {
return logger.Error()
}

func ErrorWithCtx(ctx context.Context) *zerolog.Event {
event := logger.Error().Ctx(ctx)
event = addKnownContextValues(event, ctx)
return event
return logger.ErrorWithCtx(ctx)
}

func ErrorWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event {
return ErrorWithCtx(context.WithValue(ctx, tracing.ReasonCtxKey, reason))
return logger.ErrorWithCtxAndReason(ctx, reason)
}

func Fatal() *zerolog.Event {
Expand Down
3 changes: 1 addition & 2 deletions quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ const EnableConcurrencyProfiling = false

// buildIngestOnlyQuesma is for now a helper function to help establishing the way of v2 module api import
func buildIngestOnlyQuesma() quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

ingestFrontendConnector := frontend_connectors.NewElasticsearchIngestFrontendConnector(":8080")

Expand Down
11 changes: 5 additions & 6 deletions quesma/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func Test_backendConnectorValidation(t *testing.T) {
var tcpProcessor quesma_api.Processor = processors.NewPostgresToMySqlProcessor()
var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
postgressPipeline.AddProcessor(tcpProcessor)
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

const endpoint = "root:password@tcp(127.0.0.1:3306)/test"
var mySqlBackendConnector quesma_api.BackendConnector = &backend_connectors.MySqlBackendConnector{
Endpoint: endpoint,
Expand All @@ -64,8 +64,7 @@ func fallback(_ context.Context, _ *quesma_api.Request, _ http.ResponseWriter) (
}

func ab_testing_scenario() quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Expand Down Expand Up @@ -117,8 +116,8 @@ func ab_testing_scenario() quesma_api.QuesmaBuilder {
}

func fallbackScenario() quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Elasticsearch: config.ElasticsearchConfiguration{
Expand Down
4 changes: 4 additions & 0 deletions quesma/processors/ab_test_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func NewABTestProcessor(id string, doResultComparison bool) *ABTestProcessor {
}
}

func (p *ABTestProcessor) InstanceName() string {
return "ABTestProcessor"
}

func (p *ABTestProcessor) GetId() string {
return p.Id
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func NewElasticsearchToClickHouseIngestProcessor(conf config.QuesmaProcessorConf
}
}

func (p *ElasticsearchToClickHouseIngestProcessor) InstanceName() string {
return "elasticsearch_to_clickhouse_ingest" // TODO return name from config
}

func (p *ElasticsearchToClickHouseIngestProcessor) Init() error {
chBackendConnector := p.GetBackendConnector(quesma_api.ClickHouseSQLBackend)
if chBackendConnector == nil {
Expand Down
4 changes: 4 additions & 0 deletions quesma/processors/postgres_to_mysql_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func NewPostgresToMySqlProcessor() *PostgresToMySqlProcessor {
}
}

func (p *PostgresToMySqlProcessor) InstanceName() string {
return "PostgresToMySqlProcessor"
}

func (p *PostgresToMySqlProcessor) GetId() string {
return "postgrestomysql_processor"
}
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic
elasticHttpQueryFrontendConnector := NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
logManager, registry, config, searchRouter)

quesmaBuilder := quesma_api.NewQuesma()
quesmaBuilder := quesma_api.NewQuesma(dependencies)
ingestPipeline := quesma_api.NewPipeline()
ingestPipeline.AddFrontendConnector(elasticHttpIngestFrontendConnector)

queryPipeline := quesma_api.NewPipeline()
queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector)
quesmaBuilder.AddPipeline(ingestPipeline)
quesmaBuilder.AddPipeline(queryPipeline)
quesmaBuilder.SetDependencies(dependencies)

_, err := quesmaBuilder.Build()
if err != nil {
logger.Fatal().Msgf("Error building Quesma: %v", err)
Expand Down
1 change: 1 addition & 0 deletions quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent,
dependencies := quesma_v2.NewDependencies()
dependencies.SetPhoneHomeAgent(phoneHomeAgent)
dependencies.SetDebugInfoCollector(quesmaManagementConsole)
dependencies.SetLogger(logger.GlobalLogger()) // FIXME: we're using global logger here, create

if v2 {
return &Quesma{
Expand Down
4 changes: 4 additions & 0 deletions quesma/v2/core/backend_connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string
type NoopBackendConnector struct {
}

func (p *NoopBackendConnector) InstanceName() string {
return "noop"
}

func (p *NoopBackendConnector) GetId() BackendConnectorType {
return NoopBackend
}
Expand Down
Loading

0 comments on commit dc539c6

Please sign in to comment.