From dc539c6d06890ec453ef05850a4fe927de00c30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Strzali=C5=84ski?= Date: Fri, 20 Dec 2024 12:03:30 +0100 Subject: [PATCH] v2 - Named sub logger - logger refactoring (#1124) This PR adds the ability to inject a named sublogger into the component. The Logger interface is part of V2 API. There are some TODOs, that will be done in the subsequent PRs. - initialize logger - global logger removal (if necessary) - use component names from the config --- .../clickhouse_backend_connector.go | 4 + .../elasticsearch_backend_connector.go | 4 + .../mysql_backend_connector.go | 4 + .../basic_http_frontend_connector.go | 11 +- quesma/logger/logger.go | 83 ++++------ quesma/main.go | 3 +- quesma/main_test.go | 11 +- quesma/processors/ab_test_processor.go | 4 + ...icsearch_to_clickhouse_ingest_processor.go | 4 + .../processors/postgres_to_mysql_processor.go | 4 + quesma/quesma/dual_write_proxy_v2.go | 4 +- quesma/quesma/quesma.go | 1 + quesma/v2/core/backend_connectors.go | 4 + quesma/v2/core/dependency_injection.go | 54 ++++++- quesma/v2/core/quesma_apis.go | 8 +- quesma/v2/core/quesma_builder.go | 24 +-- quesma/v2/core/quesma_logger.go | 146 ++++++++++++++++++ quesma/v2/core/quesma_pipeline.go | 12 ++ quesma/v2/go.mod | 4 + quesma/v2/go.sum | 16 ++ quesma/v2_test_objects.go | 32 ++++ 21 files changed, 355 insertions(+), 82 deletions(-) create mode 100644 quesma/v2/core/quesma_logger.go diff --git a/quesma/backend_connectors/clickhouse_backend_connector.go b/quesma/backend_connectors/clickhouse_backend_connector.go index dbedb8316..fe27412ec 100644 --- a/quesma/backend_connectors/clickhouse_backend_connector.go +++ b/quesma/backend_connectors/clickhouse_backend_connector.go @@ -96,3 +96,7 @@ func NewClickHouseBackendConnector(endpoint string) *ClickHouseBackendConnector Endpoint: endpoint, } } + +func (p *ClickHouseBackendConnector) InstanceName() string { + return "clickhouse" // TODO add name taken from config +} diff --git a/quesma/backend_connectors/elasticsearch_backend_connector.go b/quesma/backend_connectors/elasticsearch_backend_connector.go index c2f8c5cda..004c92c77 100644 --- a/quesma/backend_connectors/elasticsearch_backend_connector.go +++ b/quesma/backend_connectors/elasticsearch_backend_connector.go @@ -41,6 +41,10 @@ func NewElasticsearchBackendConnector(cfg config.ElasticsearchConfiguration) *El return conn } +func (e *ElasticsearchBackendConnector) InstanceName() string { + return "elasticsearch" // TODO return name from config +} + func (e *ElasticsearchBackendConnector) GetConfig() config.ElasticsearchConfiguration { return e.config } diff --git a/quesma/backend_connectors/mysql_backend_connector.go b/quesma/backend_connectors/mysql_backend_connector.go index 4bc5420b9..830dcb678 100644 --- a/quesma/backend_connectors/mysql_backend_connector.go +++ b/quesma/backend_connectors/mysql_backend_connector.go @@ -38,6 +38,10 @@ type MySqlBackendConnector struct { connection *sql.DB } +func (p *MySqlBackendConnector) InstanceName() string { + return "mysql" +} + func (p *MySqlBackendConnector) GetId() quesma_api.BackendConnectorType { return quesma_api.MySQLBackend } diff --git a/quesma/frontend_connectors/basic_http_frontend_connector.go b/quesma/frontend_connectors/basic_http_frontend_connector.go index 1e8f05d81..a008eb65c 100644 --- a/quesma/frontend_connectors/basic_http_frontend_connector.go +++ b/quesma/frontend_connectors/basic_http_frontend_connector.go @@ -29,6 +29,7 @@ type BasicHTTPFrontendConnector struct { phoneHomeClient diag.PhoneHomeClient debugInfoCollector diag.DebugInfoCollector + logger quesma_api.QuesmaLogger } func (h *BasicHTTPFrontendConnector) GetChildComponents() []interface{} { @@ -47,6 +48,8 @@ func (h *BasicHTTPFrontendConnector) GetChildComponents() []interface{} { func (h *BasicHTTPFrontendConnector) SetDependencies(deps quesma_api.Dependencies) { h.phoneHomeClient = deps.PhoneHomeAgent() h.debugInfoCollector = deps.DebugInfoCollector() + h.logger = deps.Logger() + deps.PhoneHomeAgent().FailedRequestsCollector(func() int64 { return h.routerInstance.FailedRequests.Load() }) @@ -66,6 +69,10 @@ func NewBasicHTTPFrontendConnector(endpoint string, config *config.QuesmaConfigu } } +func (h *BasicHTTPFrontendConnector) InstanceName() string { + return "BasicHTTPFrontendConnector" // TODO return name from config +} + func (h *BasicHTTPFrontendConnector) AddRouter(router quesma_api.Router) { h.router = router } @@ -100,9 +107,9 @@ func (h *BasicHTTPFrontendConnector) Listen() error { h.listener.Addr = h.endpoint h.listener.Handler = h go func() { + h.logger.Info().Msgf("HTTP server started on %s", h.endpoint) err := h.listener.ListenAndServe() - // TODO: Handle error - _ = err + h.logger.Error().Err(err).Msg("HTTP server stopped") }() return nil diff --git a/quesma/logger/logger.go b/quesma/logger/logger.go index d1ad0d632..bce3cb88d 100644 --- a/quesma/logger/logger.go +++ b/quesma/logger/logger.go @@ -11,7 +11,8 @@ import ( "os" "quesma/stats/errorstats" asyncQueryTracing "quesma/tracing" - tracing "quesma_v2/core/tracing" + quesma_v2 "quesma_v2/core" + "time" ) @@ -39,8 +40,6 @@ const ( bufferSizeChannel = 1024 ) -var logger zerolog.Logger - // InitLogger returns channel where log messages will be sent func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asyncQueryTraceLogger *asyncQueryTracing.AsyncTraceLogger) <-chan LogWithLevel { zerolog.TimeFieldFormat = time.RFC3339Nano // without this we don't have milliseconds timestamp precision @@ -89,7 +88,7 @@ func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asy } multi := zerolog.MultiLevelWriter(logWriters...) - logger = zerolog.New(multi). + l := zerolog.New(multi). Level(cfg.Level). With(). Timestamp(). @@ -97,12 +96,15 @@ func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asy Logger() globalError := errorstats.GlobalErrorHook{} - logger = logger.Hook(&globalError) + l = l.Hook(&globalError) if asyncQueryTraceLogger != nil { - logger = logger.Hook(asyncQueryTraceLogger) + l = l.Hook(asyncQueryTraceLogger) } - logger.Info().Msgf("Logger initialized with level %s", cfg.Level) + l.Info().Msgf("Logger initialized with level %s", cfg.Level) + + logger = quesma_v2.NewQuesmaLogger(l) + return logChannel } @@ -111,7 +113,7 @@ func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asy // of the test, and calls to the global logger will start appearing in the console. // Without it, they don't. func InitSimpleLoggerForTests() { - logger = zerolog.New( + l := zerolog.New( zerolog.ConsoleWriter{ Out: os.Stderr, TimeFormat: time.StampMilli, @@ -120,6 +122,8 @@ func InitSimpleLoggerForTests() { With(). Timestamp(). Logger() + + logger = quesma_v2.NewQuesmaLogger(l) } // InitSimpleLoggerForTestsWarnLevel initializes our global logger (level >= Warn) to the console output. @@ -127,7 +131,7 @@ func InitSimpleLoggerForTests() { // of the test, and calls to the global logger will start appearing in the console. // Without it, they don't. func InitSimpleLoggerForTestsWarnLevel() { - logger = zerolog.New( + l := zerolog.New( zerolog.ConsoleWriter{ Out: os.Stderr, TimeFormat: time.StampMilli, @@ -136,6 +140,8 @@ func InitSimpleLoggerForTestsWarnLevel() { With(). Timestamp(). Logger() + + logger = quesma_v2.NewQuesmaLogger(l) } var testLoggerInitialized bool @@ -157,7 +163,7 @@ func InitOnlyChannelLoggerForTests() <-chan LogWithLevel { logChannel := make(chan LogWithLevel, 50000) // small number like 5 or 10 made entire Quesma totally unresponsive during the few seconds where Kibana spams with messages chanWriter := channelWriter{ch: logChannel} - logger = zerolog.New(chanWriter). + l := zerolog.New(chanWriter). Level(zerolog.DebugLevel). With(). Timestamp(). @@ -165,7 +171,9 @@ func InitOnlyChannelLoggerForTests() <-chan LogWithLevel { Logger() globalError := errorstats.GlobalErrorHook{} - logger = logger.Hook(&globalError) + l = l.Hook(&globalError) + + logger = quesma_v2.NewQuesmaLogger(l) testLoggerInitialized = true return logChannel @@ -191,38 +199,23 @@ func openLogFiles(logsPath string) { } } -func addKnownContextValues(event *zerolog.Event, ctx context.Context) *zerolog.Event { +// --- legacy API - if requestId, ok := ctx.Value(tracing.RequestIdCtxKey).(string); ok { - event = event.Str(RID, requestId) - } - if path, ok := ctx.Value(tracing.RequestPath).(string); ok { - event = event.Str(Path, path) - } - if reason, ok := ctx.Value(tracing.ReasonCtxKey).(string); ok { - event = event.Str(Reason, reason) - } - if asyncId, ok := ctx.Value(tracing.AsyncIdCtxKey).(string); ok { - if asyncId != "" { - event = event.Str(AsyncId, asyncId) - } - } +// global logger, TODO this should be removed +var logger = quesma_v2.EmptyQuesmaLogger() - if requestId, ok := ctx.Value(tracing.OpaqueIdCtxKey).(string); ok { - event = event.Str(OpaqueId, requestId) - } - - return event +func GlobalLogger() quesma_v2.QuesmaLogger { + return logger } +// global logger delegates + func Debug() *zerolog.Event { return logger.Debug() } func DebugWithCtx(ctx context.Context) *zerolog.Event { - event := logger.Debug().Ctx(ctx) - event = addKnownContextValues(event, ctx) - return event + return logger.DebugWithCtx(ctx) } func Info() *zerolog.Event { @@ -230,19 +223,13 @@ func Info() *zerolog.Event { } func InfoWithCtx(ctx context.Context) *zerolog.Event { - event := logger.Info().Ctx(ctx) - event = addKnownContextValues(event, ctx) - return event + return logger.InfoWithCtx(ctx) } // MarkTraceEndWithCtx marks the end of a trace with the given context. // Calling this functions at end of a trace is crucial from the transactional logging perspective. func MarkTraceEndWithCtx(ctx context.Context) *zerolog.Event { - event := logger.Info().Ctx(ctx) - event = addKnownContextValues(event, ctx) - ctx = context.WithValue(ctx, tracing.TraceEndCtxKey, true) - event = event.Ctx(ctx) - return event + return logger.MarkTraceEndWithCtx(ctx) } func Warn() *zerolog.Event { @@ -250,13 +237,11 @@ func Warn() *zerolog.Event { } func WarnWithCtx(ctx context.Context) *zerolog.Event { - event := logger.Warn().Ctx(ctx) - event = addKnownContextValues(event, ctx) - return event + return logger.WarnWithCtx(ctx) } func WarnWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event { - return WarnWithCtx(context.WithValue(ctx, tracing.ReasonCtxKey, reason)) + return logger.WarnWithCtxAndReason(ctx, reason) } func Error() *zerolog.Event { @@ -264,13 +249,11 @@ func Error() *zerolog.Event { } func ErrorWithCtx(ctx context.Context) *zerolog.Event { - event := logger.Error().Ctx(ctx) - event = addKnownContextValues(event, ctx) - return event + return logger.ErrorWithCtx(ctx) } func ErrorWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event { - return ErrorWithCtx(context.WithValue(ctx, tracing.ReasonCtxKey, reason)) + return logger.ErrorWithCtxAndReason(ctx, reason) } func Fatal() *zerolog.Event { diff --git a/quesma/main.go b/quesma/main.go index 19ec5e80f..a42ab85fc 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -50,8 +50,7 @@ const EnableConcurrencyProfiling = false // buildIngestOnlyQuesma is for now a helper function to help establishing the way of v2 module api import func buildIngestOnlyQuesma() quesma_api.QuesmaBuilder { - var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() - quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies()) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies()) ingestFrontendConnector := frontend_connectors.NewElasticsearchIngestFrontendConnector(":8080") diff --git a/quesma/main_test.go b/quesma/main_test.go index 25d27368b..d7eff8d68 100644 --- a/quesma/main_test.go +++ b/quesma/main_test.go @@ -42,8 +42,8 @@ func Test_backendConnectorValidation(t *testing.T) { var tcpProcessor quesma_api.Processor = processors.NewPostgresToMySqlProcessor() var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() postgressPipeline.AddProcessor(tcpProcessor) - var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() - quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies()) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies()) + const endpoint = "root:password@tcp(127.0.0.1:3306)/test" var mySqlBackendConnector quesma_api.BackendConnector = &backend_connectors.MySqlBackendConnector{ Endpoint: endpoint, @@ -64,8 +64,7 @@ func fallback(_ context.Context, _ *quesma_api.Request, _ http.ResponseWriter) ( } func ab_testing_scenario() quesma_api.QuesmaBuilder { - var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() - quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies()) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies()) cfg := &config.QuesmaConfiguration{ DisableAuth: true, @@ -117,8 +116,8 @@ func ab_testing_scenario() quesma_api.QuesmaBuilder { } func fallbackScenario() quesma_api.QuesmaBuilder { - var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() - quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies()) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies()) + cfg := &config.QuesmaConfiguration{ DisableAuth: true, Elasticsearch: config.ElasticsearchConfiguration{ diff --git a/quesma/processors/ab_test_processor.go b/quesma/processors/ab_test_processor.go index cda78f495..260d242f9 100644 --- a/quesma/processors/ab_test_processor.go +++ b/quesma/processors/ab_test_processor.go @@ -27,6 +27,10 @@ func NewABTestProcessor(id string, doResultComparison bool) *ABTestProcessor { } } +func (p *ABTestProcessor) InstanceName() string { + return "ABTestProcessor" +} + func (p *ABTestProcessor) GetId() string { return p.Id } diff --git a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go index c78ca6a3b..a5852c2b1 100644 --- a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go +++ b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go @@ -43,6 +43,10 @@ func NewElasticsearchToClickHouseIngestProcessor(conf config.QuesmaProcessorConf } } +func (p *ElasticsearchToClickHouseIngestProcessor) InstanceName() string { + return "elasticsearch_to_clickhouse_ingest" // TODO return name from config +} + func (p *ElasticsearchToClickHouseIngestProcessor) Init() error { chBackendConnector := p.GetBackendConnector(quesma_api.ClickHouseSQLBackend) if chBackendConnector == nil { diff --git a/quesma/processors/postgres_to_mysql_processor.go b/quesma/processors/postgres_to_mysql_processor.go index 85be61767..99a5cf0e0 100644 --- a/quesma/processors/postgres_to_mysql_processor.go +++ b/quesma/processors/postgres_to_mysql_processor.go @@ -21,6 +21,10 @@ func NewPostgresToMySqlProcessor() *PostgresToMySqlProcessor { } } +func (p *PostgresToMySqlProcessor) InstanceName() string { + return "PostgresToMySqlProcessor" +} + func (p *PostgresToMySqlProcessor) GetId() string { return "postgrestomysql_processor" } diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go index ab3bba2ec..b29c28f8e 100644 --- a/quesma/quesma/dual_write_proxy_v2.go +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -88,7 +88,7 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic elasticHttpQueryFrontendConnector := NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), logManager, registry, config, searchRouter) - quesmaBuilder := quesma_api.NewQuesma() + quesmaBuilder := quesma_api.NewQuesma(dependencies) ingestPipeline := quesma_api.NewPipeline() ingestPipeline.AddFrontendConnector(elasticHttpIngestFrontendConnector) @@ -96,7 +96,7 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector) quesmaBuilder.AddPipeline(ingestPipeline) quesmaBuilder.AddPipeline(queryPipeline) - quesmaBuilder.SetDependencies(dependencies) + _, err := quesmaBuilder.Build() if err != nil { logger.Fatal().Msgf("Error building Quesma: %v", err) diff --git a/quesma/quesma/quesma.go b/quesma/quesma/quesma.go index 27e10b4e9..506ab5890 100644 --- a/quesma/quesma/quesma.go +++ b/quesma/quesma/quesma.go @@ -67,6 +67,7 @@ func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, dependencies := quesma_v2.NewDependencies() dependencies.SetPhoneHomeAgent(phoneHomeAgent) dependencies.SetDebugInfoCollector(quesmaManagementConsole) + dependencies.SetLogger(logger.GlobalLogger()) // FIXME: we're using global logger here, create if v2 { return &Quesma{ diff --git a/quesma/v2/core/backend_connectors.go b/quesma/v2/core/backend_connectors.go index fe6ac24e6..3ea5b81b9 100644 --- a/quesma/v2/core/backend_connectors.go +++ b/quesma/v2/core/backend_connectors.go @@ -32,6 +32,10 @@ func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string type NoopBackendConnector struct { } +func (p *NoopBackendConnector) InstanceName() string { + return "noop" +} + func (p *NoopBackendConnector) GetId() BackendConnectorType { return NoopBackend } diff --git a/quesma/v2/core/dependency_injection.go b/quesma/v2/core/dependency_injection.go index a7846b8c9..a6d1555aa 100644 --- a/quesma/v2/core/dependency_injection.go +++ b/quesma/v2/core/dependency_injection.go @@ -10,7 +10,7 @@ import ( type Dependencies interface { PhoneHomeAgent() diag.PhoneHomeClient DebugInfoCollector() diag.DebugInfoCollector - + Logger() QuesmaLogger InjectDependenciesInto(a any) } @@ -30,6 +30,7 @@ type ChildComponentProvider interface { type ComponentTreeNode struct { Id string + Name string Level int Component any Children []*ComponentTreeNode @@ -60,8 +61,15 @@ func (b *ComponentTreeBuilder) buildComponentTree(level int, a any) *ComponentTr return v } + id := fmt.Sprintf("%T(%p)", a, a) + name := id + if identifiable, ok := a.(InstanceNamer); ok { + name = identifiable.InstanceName() + } + node := &ComponentTreeNode{ - Id: fmt.Sprintf("%T(%p)", a, a), + Id: id, + Name: name, Children: make([]*ComponentTreeNode, 0), Component: a, Level: level, @@ -88,6 +96,7 @@ func (b *ComponentTreeBuilder) BuildComponentTree(a any) *ComponentTreeNode { type DependenciesImpl struct { phoneHomeAgent diag.PhoneHomeClient debugInfoCollector diag.DebugInfoCollector + logger QuesmaLogger } func (d *DependenciesImpl) PhoneHomeAgent() diag.PhoneHomeClient { @@ -98,6 +107,10 @@ func (d *DependenciesImpl) DebugInfoCollector() diag.DebugInfoCollector { return d.debugInfoCollector } +func (d *DependenciesImpl) Logger() QuesmaLogger { + return d.logger +} + func NewDependencies() *DependenciesImpl { return EmptyDependencies() } @@ -110,33 +123,58 @@ func (d *DependenciesImpl) SetDebugInfoCollector(debugInfoCollector diag.DebugIn d.debugInfoCollector = debugInfoCollector } +func (d *DependenciesImpl) SetLogger(logger QuesmaLogger) { + d.logger = logger +} + +func (d *DependenciesImpl) Clone() *DependenciesImpl { + return &DependenciesImpl{ + phoneHomeAgent: d.phoneHomeAgent, + debugInfoCollector: d.debugInfoCollector, + logger: d.logger, + } +} + func EmptyDependencies() *DependenciesImpl { return &DependenciesImpl{ phoneHomeAgent: diag.NewPhoneHomeEmptyAgent(), debugInfoCollector: diag.EmptyDebugInfoCollector(), + + logger: EmptyQuesmaLogger(), } } -const traceDependencyInjection bool = false +const traceDependencyInjection bool = true // InjectDependenciesInto injects dependencies into a component. This is indented to use in Quesma building process only. -func (d *DependenciesImpl) InjectDependenciesInto(a any) { +func (d *DependenciesImpl) InjectDependenciesInto(component any) { // TODO fmt for now. Later we can use logger. We need to move logger to the V2 module. var trace func(a ...any) if traceDependencyInjection { - prefix := fmt.Sprintf("Dependency injection into %T :", a) + prefix := fmt.Sprintf("Dependency injection into %T :", component) trace = func(a ...any) { - fmt.Println(prefix, fmt.Sprint(a...)) + d.logger.Info().Msgf("%s%s", prefix, fmt.Sprint(a...)) + //fmt.Println(prefix, fmt.Sprint(a...)) } } else { trace = func(a ...any) {} } - if injector, ok := a.(DependenciesSetter); ok { - injector.SetDependencies(d) + if target, ok := component.(DependenciesSetter); ok { + deps := d + + if named, ok := component.(InstanceNamer); ok { + // We have a named component. We can use to inject sub logger here. + + deps = d.Clone() + deps.SetLogger(deps.Logger().WithComponent(named.InstanceName())) + trace("Injecting dependencies into", named.InstanceName()) + } + + target.SetDependencies(deps) trace("OK - Injected Dependencies") } else { diff --git a/quesma/v2/core/quesma_apis.go b/quesma/v2/core/quesma_apis.go index 806b0e361..0aca32cc0 100644 --- a/quesma/v2/core/quesma_apis.go +++ b/quesma/v2/core/quesma_apis.go @@ -7,6 +7,10 @@ import ( "net" ) +type InstanceNamer interface { + InstanceName() string +} + type Router interface { Cloner AddRoute(path string, handler HTTPFrontendHandler) @@ -19,6 +23,7 @@ type Router interface { } type FrontendConnector interface { + InstanceNamer Listen() error // Start listening on the endpoint GetEndpoint() string Stop(ctx context.Context) error // Stop listening @@ -59,13 +64,13 @@ type PipelineBuilder interface { type QuesmaBuilder interface { AddPipeline(pipeline PipelineBuilder) GetPipelines() []PipelineBuilder - SetDependencies(dependencies Dependencies) Build() (QuesmaBuilder, error) Start() Stop(ctx context.Context) } type Processor interface { + InstanceNamer CompoundProcessor GetId() string Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) @@ -83,6 +88,7 @@ type Rows interface { } type BackendConnector interface { + InstanceNamer GetId() BackendConnectorType Open() error // Query executes a query that returns rows, typically a SELECT. diff --git a/quesma/v2/core/quesma_builder.go b/quesma/v2/core/quesma_builder.go index 004e7fcfb..f1dcbf38c 100644 --- a/quesma/v2/core/quesma_builder.go +++ b/quesma/v2/core/quesma_builder.go @@ -3,6 +3,7 @@ package quesma_api import ( + "bytes" "context" "fmt" ) @@ -12,9 +13,10 @@ type Quesma struct { dependencies Dependencies } -func NewQuesma() *Quesma { +func NewQuesma(deps Dependencies) *Quesma { return &Quesma{ - pipelines: make([]PipelineBuilder, 0), + pipelines: make([]PipelineBuilder, 0), + dependencies: deps, } } @@ -29,10 +31,6 @@ func (quesma *Quesma) GetChildComponents() []any { return componentList } -func (quesma *Quesma) SetDependencies(dependencies Dependencies) { - quesma.dependencies = dependencies -} - func (quesma *Quesma) AddPipeline(pipeline PipelineBuilder) { quesma.pipelines = append(quesma.pipelines, pipeline) } @@ -43,6 +41,7 @@ func (quesma *Quesma) GetPipelines() []PipelineBuilder { func (quesma *Quesma) Start() { for _, pipeline := range quesma.pipelines { + quesma.dependencies.Logger().Info().Msgf("Starting pipeline %v", pipeline) pipeline.Start() } } @@ -136,16 +135,20 @@ func (quesma *Quesma) injectDependencies(tree *ComponentTreeNode) error { func (quesma *Quesma) printTree(tree *ComponentTreeNode) { - fmt.Println("Component tree:\n---") + var buff bytes.Buffer + + _, _ = fmt.Fprintln(&buff, "Component tree:") + tree.walk(func(n *ComponentTreeNode) { for i := 0; i < n.Level; i++ { - fmt.Print(" ") + _, _ = fmt.Fprint(&buff, " ") } - fmt.Println(n.Id) + _, _ = fmt.Fprintln(&buff, n.Name) }) - fmt.Println("---") + + quesma.dependencies.Logger().Debug().Msg(buff.String()) } func (quesma *Quesma) Build() (QuesmaBuilder, error) { @@ -168,5 +171,4 @@ func (quesma *Quesma) Build() (QuesmaBuilder, error) { } return quesma, nil - } diff --git a/quesma/v2/core/quesma_logger.go b/quesma/v2/core/quesma_logger.go new file mode 100644 index 000000000..9d3bccb16 --- /dev/null +++ b/quesma/v2/core/quesma_logger.go @@ -0,0 +1,146 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +import ( + "context" + "github.com/rs/zerolog" + "os" + "quesma_v2/core/tracing" + "time" +) + +const ( + stdLogFileName = "quesma.log" + errLogFileName = "err.log" +) + +const ( + RID = "request_id" // request id key for the logger + Reason = "reason" // Known error reason key for the logger + Path = "path" + AsyncId = "async_id" + OpaqueId = "opaque_id" + ReasonPrefixUnsupportedQueryType = "unsupported_search_query: " // Reason for Error messages for unsupported queries will start with this prefix +) + +const ( + initialBufferSize = 32 * 1024 + bufferSizeChannel = 1024 +) + +type QuesmaLogger interface { + Debug() *zerolog.Event + Info() *zerolog.Event + Warn() *zerolog.Event + Error() *zerolog.Event + Fatal() *zerolog.Event + Panic() *zerolog.Event + + DebugWithCtx(ctx context.Context) *zerolog.Event + InfoWithCtx(ctx context.Context) *zerolog.Event + WarnWithCtx(ctx context.Context) *zerolog.Event + ErrorWithCtx(ctx context.Context) *zerolog.Event + + WarnWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event + ErrorWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event + + MarkTraceEndWithCtx(ctx context.Context) *zerolog.Event + + WithComponent(name string) QuesmaLogger +} + +type QuesmaLoggerImpl struct { + zerolog.Logger +} + +func NewQuesmaLogger(log zerolog.Logger) QuesmaLogger { + return &QuesmaLoggerImpl{ + Logger: log, + } +} + +func (l *QuesmaLoggerImpl) WithComponent(name string) QuesmaLogger { + return &QuesmaLoggerImpl{ + Logger: l.Logger.With().Str("component", name).Logger(), + } +} + +func (l *QuesmaLoggerImpl) MarkTraceEndWithCtx(ctx context.Context) *zerolog.Event { + event := l.Info().Ctx(ctx) + event = l.addKnownContextValues(event, ctx) + ctx = context.WithValue(ctx, tracing.TraceEndCtxKey, true) + event = event.Ctx(ctx) + return event +} + +func (l *QuesmaLoggerImpl) WarnWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event { + return l.WarnWithCtx(context.WithValue(ctx, tracing.ReasonCtxKey, reason)) +} + +func (l *QuesmaLoggerImpl) ErrorWithCtxAndReason(ctx context.Context, reason string) *zerolog.Event { + return l.ErrorWithCtx(context.WithValue(ctx, tracing.ReasonCtxKey, reason)) +} + +func (l *QuesmaLoggerImpl) addKnownContextValues(event *zerolog.Event, ctx context.Context) *zerolog.Event { + + if requestId, ok := ctx.Value(tracing.RequestIdCtxKey).(string); ok { + event = event.Str(RID, requestId) + } + if path, ok := ctx.Value(tracing.RequestPath).(string); ok { + event = event.Str(Path, path) + } + if reason, ok := ctx.Value(tracing.ReasonCtxKey).(string); ok { + event = event.Str(Reason, reason) + } + if asyncId, ok := ctx.Value(tracing.AsyncIdCtxKey).(string); ok { + if asyncId != "" { + event = event.Str(AsyncId, asyncId) + } + } + + if requestId, ok := ctx.Value(tracing.OpaqueIdCtxKey).(string); ok { + event = event.Str(OpaqueId, requestId) + } + + return event +} + +func (l *QuesmaLoggerImpl) DebugWithCtx(ctx context.Context) *zerolog.Event { + event := l.Debug().Ctx(ctx) + event = l.addKnownContextValues(event, ctx) + return event +} + +func (l *QuesmaLoggerImpl) InfoWithCtx(ctx context.Context) *zerolog.Event { + event := l.Info().Ctx(ctx) + event = l.addKnownContextValues(event, ctx) + return event +} + +func (l *QuesmaLoggerImpl) WarnWithCtx(ctx context.Context) *zerolog.Event { + event := l.Warn().Ctx(ctx) + event = l.addKnownContextValues(event, ctx) + return event + +} + +func (l *QuesmaLoggerImpl) ErrorWithCtx(ctx context.Context) *zerolog.Event { + event := l.Error().Ctx(ctx) + event = l.addKnownContextValues(event, ctx) + return event +} + +func EmptyQuesmaLogger() QuesmaLogger { + // not so empty :D + return NewQuesmaLogger(zerolog.New( + zerolog.ConsoleWriter{ + Out: os.Stderr, + TimeFormat: time.StampMilli, + }). + Level(zerolog.DebugLevel). + With(). + Timestamp(). + Logger()) + +} diff --git a/quesma/v2/core/quesma_pipeline.go b/quesma/v2/core/quesma_pipeline.go index 396fe9c46..f8d85f682 100644 --- a/quesma/v2/core/quesma_pipeline.go +++ b/quesma/v2/core/quesma_pipeline.go @@ -2,10 +2,13 @@ // SPDX-License-Identifier: Elastic-2.0 package quesma_api +import "fmt" + type Pipeline struct { FrontendConnectors []FrontendConnector Processors []Processor BackendConnectors map[BackendConnectorType]BackendConnector + logger QuesmaLogger } func NewPipeline() *Pipeline { @@ -34,6 +37,14 @@ func (p *Pipeline) GetChildComponents() []any { return components } +func (p *Pipeline) SetDependencies(deps Dependencies) { + p.logger = deps.Logger() +} + +func (p *Pipeline) InstanceName() string { + return fmt.Sprintf("pipeline(%p)", p) // TODO return name from config +} + func (p *Pipeline) AddFrontendConnector(conn FrontendConnector) { p.FrontendConnectors = append(p.FrontendConnectors, conn) } @@ -59,6 +70,7 @@ func (p *Pipeline) Start() { // because we are copying routing table from all connectors // however, bind error remains for _, conn := range p.FrontendConnectors { + p.logger.Info().Msgf("Starting frontend connector %s", conn) go conn.Listen() } } diff --git a/quesma/v2/go.mod b/quesma/v2/go.mod index 486a9b5b1..91b9405ad 100644 --- a/quesma/v2/go.mod +++ b/quesma/v2/go.mod @@ -4,6 +4,7 @@ go 1.23.2 require ( github.com/google/uuid v1.6.0 + github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.10.0 github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb ) @@ -11,8 +12,11 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/pretty v0.3.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect + golang.org/x/sys v0.21.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/quesma/v2/go.sum b/quesma/v2/go.sum index fad7c22bb..b83fb7973 100644 --- a/quesma/v2/go.sum +++ b/quesma/v2/go.sum @@ -1,6 +1,8 @@ +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -11,15 +13,29 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb h1:Ywfo8sUltxogBpFuMOFRrrSifO788kAFxmvVw31PtQQ= github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb/go.mod h1:ikPs9bRWicNw3S7XpJ8sK/smGwU9WcSVU3dy9qahYBM= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/quesma/v2_test_objects.go b/quesma/v2_test_objects.go index ec05de114..a4ccf328d 100644 --- a/quesma/v2_test_objects.go +++ b/quesma/v2_test_objects.go @@ -118,17 +118,29 @@ func search(_ context.Context, request *quesma_api.Request, _ http.ResponseWrite type IngestProcessor struct { processors.BaseProcessor + logger quesma_api.QuesmaLogger } func NewIngestProcessor() *IngestProcessor { return &IngestProcessor{BaseProcessor: processors.NewBaseProcessor()} } +func (p *IngestProcessor) SetDependencies(deps quesma_api.Dependencies) { + p.logger = deps.Logger() +} + +func (p *IngestProcessor) InstanceName() string { + return "IngestProcessor" // TODO return name from config +} + func (p *IngestProcessor) GetId() string { return "IngestProcessor" } func (p *IngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + + p.logger.Info().Msgf("IngestProcessor: handling message %T", message) + var data []byte for _, m := range message { var err error @@ -157,6 +169,10 @@ func NewInnerQueryProcessor2() *InnerQueryProcessor2 { } } +func (p *InnerQueryProcessor2) InstanceName() string { + return "InnerQueryProcessor2" +} + func (p *InnerQueryProcessor2) GetId() string { return "InnerQueryProcessor2" } @@ -192,6 +208,10 @@ func NewInnerQueryProcessor1() *InnerQueryProcessor1 { } } +func (p *InnerQueryProcessor1) InstanceName() string { + return "InnerQueryProcessor1" +} + func (p *InnerQueryProcessor1) GetId() string { return "InnerQueryProcessor1" } @@ -226,6 +246,10 @@ func NewInnerIngestProcessor2() *InnerIngestProcessor2 { } } +func (p *InnerIngestProcessor2) InstanceName() string { + return "InnerIngestProcessor2" +} + func (p *InnerIngestProcessor2) GetId() string { return "InnerIngestProcessor2" } @@ -256,6 +280,10 @@ func NewInnerIngestProcessor1() *InnerIngestProcessor1 { } } +func (p *InnerIngestProcessor1) InstanceName() string { + return "InnerIngestProcessor1" +} + func (p *InnerIngestProcessor1) GetId() string { return "InnerIngestProcessor1" } @@ -286,6 +314,10 @@ func NewQueryProcessor() *QueryProcessor { } } +func (p *QueryProcessor) InstanceName() string { + return "QueryProcessor" // TODO return name from config +} + func (p *QueryProcessor) GetId() string { return "QueryProcessor" }