Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow provided schema, schema cache #162

Merged
merged 4 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ issues:
- errname
- forcetypeassert
- dupl
- maintidx

linters:
# please, do not use `enable-all`: it's deprecated and will be removed soon.
Expand Down
2 changes: 1 addition & 1 deletion destination_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func TestDestinationWithSchemaExtraction_Write(t *testing.T) {

testStructuredData := opencdc.StructuredData{
"foo": "bar",
"int": 1,
"long": int64(1),
"float": 2.34,
"time": time.Now().UTC().Truncate(time.Microsecond), // avro precision is microseconds
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.5

require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/conduitio/conduit-commons v0.2.1-0.20240717151024-0c8d1f406cb2
github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980
github.com/goccy/go-json v0.10.3
github.com/golangci/golangci-lint v1.59.1
Expand All @@ -13,6 +13,7 @@ require (
github.com/jpillora/backoff v1.0.0
github.com/matryer/is v1.4.1
github.com/rs/zerolog v1.33.0
github.com/twmb/go-cache v1.2.1
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
golang.org/x/sync v0.7.0
Expand Down Expand Up @@ -94,7 +95,7 @@ require (
github.com/gostaticanalysis/comment v1.4.2 // indirect
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
github.com/hamba/avro/v2 v2.22.1 // indirect
github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/go-plugin v1.6.1 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
Expand Down Expand Up @@ -192,7 +193,6 @@ require (
github.com/timonwong/loggercheck v0.9.4 // indirect
github.com/tomarrell/wrapcheck/v2 v2.8.3 // indirect
github.com/tommy-muehle/go-mnd/v2 v2.5.1 // indirect
github.com/twmb/go-cache v1.2.1 // indirect
github.com/ultraware/funlen v0.1.0 // indirect
github.com/ultraware/whitespace v0.1.1 // indirect
github.com/uudashr/gocognit v1.1.2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ github.com/chavacava/garif v0.1.0 h1:2JHa3hbYf5D9dsgseMKAmc/MZ109otzgNFk5s87H9Pc
github.com/chavacava/garif v0.1.0/go.mod h1:XMyYCkEL58DF0oyW4qDjjnPWONs2HBqYKI+UIPD+Gww=
github.com/ckaznocha/intrange v0.1.2 h1:3Y4JAxcMntgb/wABQ6e8Q8leMd26JbX2790lIss9MTI=
github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8VhJZEEA5n+RE=
github.com/conduitio/conduit-commons v0.2.1-0.20240717151024-0c8d1f406cb2 h1:0Ba/B4lyxeGIVk4zvVGRx1kAdLuXK+8st/LyMKZCmu4=
github.com/conduitio/conduit-commons v0.2.1-0.20240717151024-0c8d1f406cb2/go.mod h1:w0eHaH81yoab8VcrrTjFGNGRQMx45RnXWobKMpKjgrM=
github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58 h1:bv61cvXf6Tn8RS57vnrWJwKlIAPLoroe167Hs//fnLA=
github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58/go.mod h1:QYVlSvfOG4AB9tFz9NS++ToinF1yuGG0D0CgzwLG4k0=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980 h1:Hwg9Ho0Rvrg0rVype0yQA7jJtGrG4zY6RQzvcXAi4Kk=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980/go.mod h1:GyI6kkdR55JGM/96v5OSI7vlVodur3L22SY+OJbPd0s=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -189,8 +189,8 @@ github.com/gostaticanalysis/nilerr v0.1.1/go.mod h1:wZYb6YI5YAxxq0i1+VJbY0s2YONW
github.com/gostaticanalysis/testutil v0.3.1-0.20210208050101-bfb5c8eec0e4/go.mod h1:D+FIZ+7OahH3ePw/izIEeH5I06eKs1IKI4Xr64/Am3M=
github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoISdUv3PPQgHY=
github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU=
github.com/hamba/avro/v2 v2.22.1 h1:q1rAbfJsrbMaZPDLQvwUQMfQzp6H+hGXvckmU/lXemk=
github.com/hamba/avro/v2 v2.22.1/go.mod h1:HOeTrE3kvWnBAgsufqhAzDDV5gvS0QXs65Z6BHfGgbg=
github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693 h1:ECZbIygcX0RoDjemCoJ+h6FfHcbTNDIQZPQ7LDhTbao=
github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.6.1 h1:P7MR2UP6gNKGPp+y7EZw2kOiq4IR9WiqLvp0XOsVdwI=
Expand Down
69 changes: 69 additions & 0 deletions schema/cached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schema

import (
"context"
"time"

"github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-connector-protocol/pconnutils"
"github.com/twmb/go-cache/cache"
)

func newCachedSchemaService(service pconnutils.SchemaService) *cachedSchemaService {
return &cachedSchemaService{
SchemaService: service,

getSchemaCache: cache.New[pconnutils.GetSchemaRequest, pconnutils.GetSchemaResponse](
cache.MaxAge(15 * time.Minute), // expire entries after 15 minutes
),
createSchemaCache: cache.New[comparableCreateSchemaRequest, pconnutils.CreateSchemaResponse](
cache.MaxAge(15 * time.Minute), // expire entries after 15 minutes
),
}
}

type cachedSchemaService struct {
pconnutils.SchemaService

getSchemaCache *cache.Cache[pconnutils.GetSchemaRequest, pconnutils.GetSchemaResponse]
createSchemaCache *cache.Cache[comparableCreateSchemaRequest, pconnutils.CreateSchemaResponse]
}

type comparableCreateSchemaRequest struct {
Subject string
Type schema.Type
Bytes string
}

func (c *cachedSchemaService) GetSchema(ctx context.Context, request pconnutils.GetSchemaRequest) (pconnutils.GetSchemaResponse, error) {
resp, err, _ := c.getSchemaCache.Get(request, func() (pconnutils.GetSchemaResponse, error) {
return c.SchemaService.GetSchema(ctx, request)
})
return resp, err
}

func (c *cachedSchemaService) CreateSchema(ctx context.Context, request pconnutils.CreateSchemaRequest) (pconnutils.CreateSchemaResponse, error) {
creq := comparableCreateSchemaRequest{
Subject: request.Subject,
Type: request.Type,
Bytes: string(request.Bytes),
}
resp, err, _ := c.createSchemaCache.Get(creq, func() (pconnutils.CreateSchemaResponse, error) {
return c.SchemaService.CreateSchema(ctx, request)
})
return resp, err
}
130 changes: 130 additions & 0 deletions schema/cached_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schema

import (
"context"
"testing"

"github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-connector-protocol/pconnutils"
"github.com/conduitio/conduit-connector-protocol/pconnutils/mock"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)

func TestCachedSchemaService_Get(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctrl := gomock.NewController(t)
schemaServiceMock := mock.NewSchemaService(ctrl)

req1 := pconnutils.GetSchemaRequest{
Subject: "test",
Version: 1,
}
resp1 := pconnutils.GetSchemaResponse{
Schema: schema.Schema{
ID: 1,
Subject: "test",
Version: 1,
Type: schema.TypeAvro,
Bytes: []byte("int"),
},
}

req2 := pconnutils.GetSchemaRequest{
Subject: "test",
Version: 2,
}
resp2 := pconnutils.GetSchemaResponse{
Schema: schema.Schema{
ID: 2,
Subject: "test",
Version: 2,
Type: schema.TypeAvro,
Bytes: []byte("string"),
},
}

// The underlying mock should be called only once per unique request
schemaServiceMock.EXPECT().GetSchema(ctx, req1).Return(resp1, nil).Times(1)
schemaServiceMock.EXPECT().GetSchema(ctx, req2).Return(resp2, nil).Times(1)

schemaService := newCachedSchemaService(schemaServiceMock)

for i := 0; i < 10; i++ {
gotResp1, err := schemaService.GetSchema(ctx, req1)
is.NoErr(err)
is.Equal(resp1, gotResp1)

gotResp2, err := schemaService.GetSchema(ctx, req2)
is.NoErr(err)
is.Equal(resp2, gotResp2)
}
}

func TestCachedSchemaService_Create(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctrl := gomock.NewController(t)
schemaServiceMock := mock.NewSchemaService(ctrl)

req1 := pconnutils.CreateSchemaRequest{
Subject: "test",
Type: schema.TypeAvro,
Bytes: []byte("int"),
}
resp1 := pconnutils.CreateSchemaResponse{
Schema: schema.Schema{
ID: 1,
Subject: "test",
Version: 1,
Type: schema.TypeAvro,
Bytes: []byte("int"),
},
}

req2 := pconnutils.CreateSchemaRequest{
Subject: "test",
Type: schema.TypeAvro,
Bytes: []byte("string"),
}
resp2 := pconnutils.CreateSchemaResponse{
Schema: schema.Schema{
ID: 2,
Subject: "test",
Version: 2,
Type: schema.TypeAvro,
Bytes: []byte("string"),
},
}

// The underlying mock should be called only once per unique request
schemaServiceMock.EXPECT().CreateSchema(ctx, req1).Return(resp1, nil).Times(1)
schemaServiceMock.EXPECT().CreateSchema(ctx, req2).Return(resp2, nil).Times(1)

schemaService := newCachedSchemaService(schemaServiceMock)

for i := 0; i < 10; i++ {
gotResp1, err := schemaService.CreateSchema(ctx, req1)
is.NoErr(err)
is.Equal(resp1, gotResp1)

gotResp2, err := schemaService.CreateSchema(ctx, req2)
is.NoErr(err)
is.Equal(resp2, gotResp2)
}
}
4 changes: 2 additions & 2 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

// Service is the schema service client that can be used to interact with the schema service.
// It is initialized with an in-memory service by default.
var Service = newInMemoryService()
var Service = newCachedSchemaService(newInMemoryService())

// Create creates a new schema with the given name and bytes. The schema type must be Avro.
func Create(ctx context.Context, typ schema.Type, subject string, bytes []byte) (schema.Schema, error) {
Expand Down Expand Up @@ -108,6 +108,6 @@ type standaloneInitializer struct{}

// Init initializes the schema service client with the given gRPC connection.
func (standaloneInitializer) Init(conn *grpc.ClientConn) error {
Service = client.NewSchemaServiceClient(conn)
Service = newCachedSchemaService(client.NewSchemaServiceClient(conn))
return nil
}
Loading