diff --git a/pconduit/global/util.go b/pconduit/logger.go similarity index 53% rename from pconduit/global/util.go rename to pconduit/logger.go index 23b762d..b4b6c2d 100644 --- a/pconduit/global/util.go +++ b/pconduit/logger.go @@ -12,25 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package global provides the functionality for Conduit to set up utilities -// for processors. DO NOT use this package directly. -package global +package pconduit import ( "os" - "github.com/conduitio/conduit-processor-sdk/pconduit" "github.com/rs/zerolog" ) -var ( - // Logger is the logger for the processor. DO NOT use this logger directly, - // instead use the Logger() function in the root of the processor SDK. - Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}). - With(). - Timestamp(). - Logger() - - // TODO by default set to an in-memory schema service. - SchemaService pconduit.SchemaService -) +// Logger is the logger for the processor. DO NOT use this logger directly, +// instead use the Logger() function in the root of the processor SDK. +var Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}). + With(). + Timestamp(). + Logger() diff --git a/pconduit/v1/fromproto/schema.go b/pconduit/v1/fromproto/schema.go index 7ffeae4..e1c9e33 100644 --- a/pconduit/v1/fromproto/schema.go +++ b/pconduit/v1/fromproto/schema.go @@ -28,6 +28,13 @@ func CreateSchemaRequest(req *conduitv1.CreateSchemaRequest) pconduit.CreateSche } } +func GetSchemaRequest(req *conduitv1.GetSchemaRequest) pconduit.GetSchemaRequest { + return pconduit.GetSchemaRequest{ + Subject: req.Subject, + Version: int(req.Version), + } +} + func CreateSchemaResponse(resp *conduitv1.CreateSchemaResponse) pconduit.CreateSchemaResponse { return pconduit.CreateSchemaResponse{ Schema: schema.Schema{ diff --git a/schema/in_memory.go b/schema/in_memory.go new file mode 100644 index 0000000..cc55088 --- /dev/null +++ b/schema/in_memory.go @@ -0,0 +1,73 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "context" + "fmt" + "sync" + + "github.com/conduitio/conduit-commons/schema" + "github.com/conduitio/conduit-processor-sdk/pconduit" +) + +type InMemoryService struct { + // schemas is a map of schema subjects to all the versions of that schema + // versioning starts at 1, newer versions are appended to the end of the versions slice. + schemas map[string][]schema.Schema + // m guards access to schemas + m sync.Mutex +} + +func NewInMemoryService() pconduit.SchemaService { + return &InMemoryService{ + schemas: make(map[string][]schema.Schema), + } +} + +func (s *InMemoryService) CreateSchema(_ context.Context, request pconduit.CreateSchemaRequest) (pconduit.CreateSchemaResponse, error) { + if request.Type != schema.TypeAvro { + return pconduit.CreateSchemaResponse{}, pconduit.ErrInvalidSchema + } + + s.m.Lock() + defer s.m.Unlock() + + inst := schema.Schema{ + Subject: request.Subject, + Version: len(s.schemas[request.Subject]) + 1, + Type: request.Type, + Bytes: request.Bytes, + } + s.schemas[request.Subject] = append(s.schemas[request.Subject], inst) + + return pconduit.CreateSchemaResponse{Schema: inst}, nil +} + +func (s *InMemoryService) GetSchema(_ context.Context, request pconduit.GetSchemaRequest) (pconduit.GetSchemaResponse, error) { + s.m.Lock() + defer s.m.Unlock() + + versions, ok := s.schemas[request.Subject] + if !ok { + return pconduit.GetSchemaResponse{}, fmt.Errorf("subject %v: %w", request.Subject, pconduit.ErrSubjectNotFound) + } + + if len(versions) < request.Version { + return pconduit.GetSchemaResponse{}, fmt.Errorf("version %v: %w", request.Version, pconduit.ErrVersionNotFound) + } + + return pconduit.GetSchemaResponse{Schema: versions[request.Version-1]}, nil +} diff --git a/schema/schema.go b/schema/schema.go index d68087b..e07204f 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -20,11 +20,12 @@ import ( "github.com/conduitio/conduit-commons/schema" "github.com/conduitio/conduit-processor-sdk/pconduit" - "github.com/conduitio/conduit-processor-sdk/pconduit/global" ) const TypeAvro = schema.TypeAvro +var SchemaService = NewInMemoryService() + var ( ErrSubjectNotFound = pconduit.ErrSubjectNotFound ErrVersionNotFound = pconduit.ErrVersionNotFound @@ -32,7 +33,7 @@ var ( ) func Get(ctx context.Context, subject string, version int) (schema.Schema, error) { - resp, err := global.SchemaService.GetSchema(ctx, pconduit.GetSchemaRequest{ + resp, err := SchemaService.GetSchema(ctx, pconduit.GetSchemaRequest{ Subject: subject, Version: version, }) @@ -43,7 +44,7 @@ func Get(ctx context.Context, subject string, version int) (schema.Schema, error } func Create(ctx context.Context, typ schema.Type, subject string, bytes []byte) (schema.Schema, error) { - resp, err := global.SchemaService.CreateSchema(ctx, pconduit.CreateSchemaRequest{ + resp, err := SchemaService.CreateSchema(ctx, pconduit.CreateSchemaRequest{ Subject: subject, Type: typ, Bytes: bytes, diff --git a/util.go b/util.go index ece6642..2a6cac2 100644 --- a/util.go +++ b/util.go @@ -21,7 +21,7 @@ import ( "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-processor-sdk/internal/reference" - "github.com/conduitio/conduit-processor-sdk/pconduit/global" + "github.com/conduitio/conduit-processor-sdk/pconduit" "github.com/rs/zerolog" ) @@ -29,7 +29,7 @@ import ( // is passed to any of the processor's methods (Configure, Open, Process, // Teardown) to ensure that the log messages include contextual information. func Logger(ctx context.Context) *zerolog.Logger { - l := global.Logger.With().Ctx(ctx).Logger() + l := pconduit.Logger.With().Ctx(ctx).Logger() return &l } diff --git a/wasm/util.go b/wasm/util.go index ddf8e41..1fb973f 100644 --- a/wasm/util.go +++ b/wasm/util.go @@ -19,7 +19,8 @@ package wasm import ( "os" - "github.com/conduitio/conduit-processor-sdk/pconduit/global" + "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/schema" "github.com/rs/zerolog" ) @@ -38,9 +39,9 @@ func initLogger(logLevel string) { level = zerolog.DebugLevel } logger = logger.Level(level) - global.Logger = logger + pconduit.Logger = logger } func initSchemaService() { - global.SchemaService = &schemaService{} + schema.SchemaService = &schemaService{} }