diff --git a/go.mod b/go.mod index 4c75ae5..3e6d491 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.21.5 require ( github.com/bufbuild/buf v1.34.0 - github.com/conduitio/conduit-commons v0.2.0 + github.com/conduitio/conduit-commons v0.2.1-0.20240701165122-3948a38f3667 github.com/goccy/go-json v0.10.3 github.com/golangci/golangci-lint v1.59.1 github.com/google/go-cmp v0.6.0 @@ -118,6 +118,7 @@ require ( github.com/gostaticanalysis/comment v1.4.2 // indirect github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect + github.com/hamba/avro/v2 v2.22.1 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect @@ -127,6 +128,7 @@ require ( github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect github.com/jjti/go-spancheck v0.6.1 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/julz/importas v0.1.0 // indirect github.com/karamaru-alpha/copyloopvar v1.1.0 // indirect github.com/kisielk/errcheck v1.7.0 // indirect @@ -155,6 +157,8 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/moricho/tparallel v0.3.1 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/nakabonne/nestif v0.3.1 // indirect diff --git a/go.sum b/go.sum index d1a4092..ab4d94d 100644 --- a/go.sum +++ b/go.sum @@ -152,8 +152,8 @@ github.com/ckaznocha/intrange v0.1.2 h1:3Y4JAxcMntgb/wABQ6e8Q8leMd26JbX2790lIss9 github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8VhJZEEA5n+RE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/conduitio/conduit-commons v0.2.0 h1:TMpVGXi0Wski537qLAyQWdGjuGHEhaZxOS5L90pZJSQ= -github.com/conduitio/conduit-commons v0.2.0/go.mod h1:i7Q2jm7FBSi2zj1/4MCsFD1hIKAbvamlNtSQfkhUTiY= +github.com/conduitio/conduit-commons v0.2.1-0.20240701165122-3948a38f3667 h1:3HUOjmhoFGwanbq8GOqbCVCKzQom31s9EILVAGGW1R4= +github.com/conduitio/conduit-commons v0.2.1-0.20240701165122-3948a38f3667/go.mod h1:S7zRUQc8goT3gFRbS6jLHV9SxzAeiJ/JKe7FxMX5B1U= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU= @@ -368,8 +368,10 @@ github.com/gostaticanalysis/nilerr v0.1.1/go.mod h1:wZYb6YI5YAxxq0i1+VJbY0s2YONW github.com/gostaticanalysis/testutil v0.3.1-0.20210208050101-bfb5c8eec0e4/go.mod h1:D+FIZ+7OahH3ePw/izIEeH5I06eKs1IKI4Xr64/Am3M= github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoISdUv3PPQgHY= github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/hamba/avro/v2 v2.22.1 h1:q1rAbfJsrbMaZPDLQvwUQMfQzp6H+hGXvckmU/lXemk= +github.com/hamba/avro/v2 v2.22.1/go.mod h1:HOeTrE3kvWnBAgsufqhAzDDV5gvS0QXs65Z6BHfGgbg= github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= @@ -401,6 +403,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -482,9 +485,11 @@ github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6U github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/moricho/tparallel v0.3.1 h1:fQKD4U1wRMAYNngDonW5XupoB/ZGJHdpzrWqgyg9krA= github.com/moricho/tparallel v0.3.1/go.mod h1:leENX2cUv7Sv2qDgdi0D0fCftN8fRC67Bcn8pqzeYNI= diff --git a/internal/util.go b/internal/util.go deleted file mode 100644 index a8d64ed..0000000 --- a/internal/util.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import ( - "context" - - "github.com/rs/zerolog" -) - -type Util interface { - Logger(ctx context.Context) *zerolog.Logger -} - -// utilCtxKey is used as the key when saving the Util in a context. -type utilCtxKey struct{} - -// ContextWithUtil wraps ctx and returns a context that contains Util. -func ContextWithUtil(ctx context.Context, util Util) context.Context { - return context.WithValue(ctx, utilCtxKey{}, util) -} - -// UtilFromContext fetches the record Util from the context. If the -// context does not contain a Util it returns nil. -func UtilFromContext(ctx context.Context) Util { - util := ctx.Value(utilCtxKey{}) - if util != nil { - return util.(Util) //nolint:forcetypeassert // we know it's a Util, we set it - } - return DefaultUtil{} -} - -// DefaultUtil is the default implementation of Util. This is used when no Util -// is set (i.e. in tests). -type DefaultUtil struct{} - -func (DefaultUtil) Logger(ctx context.Context) *zerolog.Logger { - return zerolog.Ctx(ctx) -} diff --git a/conduit/util.go b/pconduit/doc.go similarity index 54% rename from conduit/util.go rename to pconduit/doc.go index 1ae5caf..5056dd1 100644 --- a/conduit/util.go +++ b/pconduit/doc.go @@ -12,18 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package conduit provides the functionality for Conduit to set up and run -// built-in processors. DO NOT use this package directly. -package conduit - -import ( - "context" - - "github.com/conduitio/conduit-processor-sdk/internal" -) - -// ContextWithUtil allows Conduit to set the Util interface for built-in -// processors. DO NOT use this function in your processor. -func ContextWithUtil(ctx context.Context, util internal.Util) context.Context { - return internal.ContextWithUtil(ctx, util) -} +// Package pconduit provides the functionality for Conduit to set up utilities +// for processors. DO NOT use this package directly. +package pconduit diff --git a/wasm/errors.go b/pconduit/errors.go similarity index 76% rename from wasm/errors.go rename to pconduit/errors.go index fcd6359..f7c84f8 100644 --- a/wasm/errors.go +++ b/pconduit/errors.go @@ -12,24 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wasm +package pconduit import ( - "errors" "math" ) const ( - // ErrorCodeStart is the smallest error code which the host (i.e. Conduit) can send. - // The imported function _nextCommand returns an uint32 value - // that is either the number of bytes actually written or an error code. - // Because of that, we're reserving a range of error codes. + // ErrorCodeStart is the smallest error code which the wasm package can send. ErrorCodeStart = math.MaxUint32 - 100 ErrorCodeNoMoreCommands = math.MaxUint32 - iota ErrorCodeUnknownCommandRequest ErrorCodeUnknownCommandResponse ErrorCodeMemoryOutOfRange + + ErrorCodeSubjectNotFound + ErrorCodeVersionNotFound + ErrorCodeInvalidSchema ) var ( @@ -37,9 +37,12 @@ var ( ErrUnknownCommandRequest = NewError(ErrorCodeUnknownCommandRequest, "unknown command request") ErrUnknownCommandResponse = NewError(ErrorCodeUnknownCommandResponse, "unknown command response") ErrMemoryOutOfRange = NewError(ErrorCodeMemoryOutOfRange, "memory out of range") + + ErrSubjectNotFound = NewError(ErrorCodeSubjectNotFound, "schema subject not found") + ErrVersionNotFound = NewError(ErrorCodeVersionNotFound, "schema version not found") + ErrInvalidSchema = NewError(ErrorCodeInvalidSchema, "invalid schema") ) -// Error is an error sent to or received from the host (i.e. Conduit). type Error struct { ErrCode uint32 Message string @@ -74,15 +77,13 @@ func NewErrorFromCode(code uint32) *Error { return ErrUnknownCommandResponse case ErrorCodeMemoryOutOfRange: return ErrMemoryOutOfRange + case ErrorCodeSubjectNotFound: + return ErrSubjectNotFound + case ErrorCodeVersionNotFound: + return ErrVersionNotFound + case ErrorCodeInvalidSchema: + return ErrInvalidSchema default: return NewError(code, "unknown error code") } } - -func CodeFromError(err error) uint32 { - var wasmErr *Error - if errors.As(err, &wasmErr) { - return wasmErr.ErrCode - } - return 0 -} diff --git a/pconduit/global/util.go b/pconduit/global/util.go new file mode 100644 index 0000000..23b762d --- /dev/null +++ b/pconduit/global/util.go @@ -0,0 +1,36 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package global provides the functionality for Conduit to set up utilities +// for processors. DO NOT use this package directly. +package global + +import ( + "os" + + "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/rs/zerolog" +) + +var ( + // Logger is the logger for the processor. DO NOT use this logger directly, + // instead use the Logger() function in the root of the processor SDK. + Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}). + With(). + Timestamp(). + Logger() + + // TODO by default set to an in-memory schema service. + SchemaService pconduit.SchemaService +) diff --git a/pconduit/schema.go b/pconduit/schema.go new file mode 100644 index 0000000..9a6d7fc --- /dev/null +++ b/pconduit/schema.go @@ -0,0 +1,43 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pconduit + +import ( + "context" + + "github.com/conduitio/conduit-commons/schema" +) + +type CreateSchemaRequest struct { + Subject string + Type schema.Type + Bytes []byte +} +type CreateSchemaResponse struct { + Schema schema.Schema +} + +type GetSchemaRequest struct { + Subject string + Version int +} +type GetSchemaResponse struct { + Schema schema.Schema +} + +type SchemaService interface { + CreateSchema(context.Context, CreateSchemaRequest) (CreateSchemaResponse, error) + GetSchema(context.Context, GetSchemaRequest) (GetSchemaResponse, error) +} diff --git a/pconduit/v1/fromproto/schema.go b/pconduit/v1/fromproto/schema.go new file mode 100644 index 0000000..7ffeae4 --- /dev/null +++ b/pconduit/v1/fromproto/schema.go @@ -0,0 +1,51 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fromproto + +import ( + "github.com/conduitio/conduit-commons/schema" + "github.com/conduitio/conduit-processor-sdk/pconduit" + conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1" +) + +func CreateSchemaRequest(req *conduitv1.CreateSchemaRequest) pconduit.CreateSchemaRequest { + return pconduit.CreateSchemaRequest{ + Subject: req.Subject, + Type: schema.Type(req.Type), + Bytes: req.Bytes, + } +} + +func CreateSchemaResponse(resp *conduitv1.CreateSchemaResponse) pconduit.CreateSchemaResponse { + return pconduit.CreateSchemaResponse{ + Schema: schema.Schema{ + Subject: resp.Schema.Subject, + Version: int(resp.Schema.Version), + Type: schema.Type(resp.Schema.Type), + Bytes: resp.Schema.Bytes, + }, + } +} + +func GetSchemaResponse(resp *conduitv1.GetSchemaResponse) pconduit.GetSchemaResponse { + return pconduit.GetSchemaResponse{ + Schema: schema.Schema{ + Subject: resp.Schema.Subject, + Version: int(resp.Schema.Version), + Type: schema.Type(resp.Schema.Type), + Bytes: resp.Schema.Bytes, + }, + } +} diff --git a/pconduit/v1/toproto/schema.go b/pconduit/v1/toproto/schema.go new file mode 100644 index 0000000..e03965a --- /dev/null +++ b/pconduit/v1/toproto/schema.go @@ -0,0 +1,58 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toproto + +import ( + schemav1 "github.com/conduitio/conduit-commons/proto/schema/v1" + "github.com/conduitio/conduit-processor-sdk/pconduit" + conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1" +) + +func GetSchemaRequest(in pconduit.GetSchemaRequest) *conduitv1.GetSchemaRequest { + return &conduitv1.GetSchemaRequest{ + Subject: in.Subject, + Version: int32(in.Version), + } +} + +func GetSchemaResponse(in pconduit.GetSchemaResponse) *conduitv1.GetSchemaResponse { + return &conduitv1.GetSchemaResponse{ + Schema: &schemav1.Schema{ + Subject: in.Schema.Subject, + Version: int32(in.Schema.Version), + Type: schemav1.Schema_Type(in.Schema.Type), + Bytes: in.Schema.Bytes, + }, + } +} + +func CreateSchemaRequest(in pconduit.CreateSchemaRequest) *conduitv1.CreateSchemaRequest { + return &conduitv1.CreateSchemaRequest{ + Subject: in.Subject, + Type: schemav1.Schema_Type(in.Type), + Bytes: in.Bytes, + } +} + +func CreateSchemaResponse(in pconduit.CreateSchemaResponse) *conduitv1.CreateSchemaResponse { + return &conduitv1.CreateSchemaResponse{ + Schema: &schemav1.Schema{ + Subject: in.Schema.Subject, + Version: int32(in.Schema.Version), + Type: schemav1.Schema_Type(in.Schema.Type), + Bytes: in.Schema.Bytes, + }, + } +} diff --git a/proto/buf.lock b/proto/buf.lock index 48dee55..a88a798 100644 --- a/proto/buf.lock +++ b/proto/buf.lock @@ -4,5 +4,5 @@ deps: - remote: buf.build owner: conduitio repository: conduit-commons - commit: 5b10e1d6574640b2864772621d09bba7 - digest: shake256:54f1581e61a4f540fe141893c80a924e0091cfe57b19bde9078ca10dd152828acdebcc8e6b9abf2fb6c8dbede06d0903fc85b426009ad9e9fb149813bfe75d63 + commit: 956394d96ec74da9ab7fd598af370668 + digest: shake256:f9b02544f4b6cb945af9529e9168ed56a9096eb8eb337eb4a9032dc8b23d2e57d4a404b72e4e43927b424ff86865dd753df0cbc4c3b8b44273007f85d4353736 diff --git a/proto/conduit/v1/schema.pb.go b/proto/conduit/v1/schema.pb.go new file mode 100644 index 0000000..af03fec --- /dev/null +++ b/proto/conduit/v1/schema.pb.go @@ -0,0 +1,382 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc (unknown) +// source: conduit/v1/schema.proto + +package conduitv1 + +import ( + v1 "github.com/conduitio/conduit-commons/proto/schema/v1" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type CreateSchemaRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + Type v1.Schema_Type `protobuf:"varint,2,opt,name=type,proto3,enum=schema.v1.Schema_Type" json:"type,omitempty"` + Bytes []byte `protobuf:"bytes,3,opt,name=bytes,proto3" json:"bytes,omitempty"` +} + +func (x *CreateSchemaRequest) Reset() { + *x = CreateSchemaRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_conduit_v1_schema_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateSchemaRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateSchemaRequest) ProtoMessage() {} + +func (x *CreateSchemaRequest) ProtoReflect() protoreflect.Message { + mi := &file_conduit_v1_schema_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateSchemaRequest.ProtoReflect.Descriptor instead. +func (*CreateSchemaRequest) Descriptor() ([]byte, []int) { + return file_conduit_v1_schema_proto_rawDescGZIP(), []int{0} +} + +func (x *CreateSchemaRequest) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *CreateSchemaRequest) GetType() v1.Schema_Type { + if x != nil { + return x.Type + } + return v1.Schema_Type(0) +} + +func (x *CreateSchemaRequest) GetBytes() []byte { + if x != nil { + return x.Bytes + } + return nil +} + +type CreateSchemaResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Schema *v1.Schema `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"` +} + +func (x *CreateSchemaResponse) Reset() { + *x = CreateSchemaResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_conduit_v1_schema_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateSchemaResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateSchemaResponse) ProtoMessage() {} + +func (x *CreateSchemaResponse) ProtoReflect() protoreflect.Message { + mi := &file_conduit_v1_schema_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateSchemaResponse.ProtoReflect.Descriptor instead. +func (*CreateSchemaResponse) Descriptor() ([]byte, []int) { + return file_conduit_v1_schema_proto_rawDescGZIP(), []int{1} +} + +func (x *CreateSchemaResponse) GetSchema() *v1.Schema { + if x != nil { + return x.Schema + } + return nil +} + +type GetSchemaRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + Version int32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` +} + +func (x *GetSchemaRequest) Reset() { + *x = GetSchemaRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_conduit_v1_schema_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetSchemaRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetSchemaRequest) ProtoMessage() {} + +func (x *GetSchemaRequest) ProtoReflect() protoreflect.Message { + mi := &file_conduit_v1_schema_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetSchemaRequest.ProtoReflect.Descriptor instead. +func (*GetSchemaRequest) Descriptor() ([]byte, []int) { + return file_conduit_v1_schema_proto_rawDescGZIP(), []int{2} +} + +func (x *GetSchemaRequest) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *GetSchemaRequest) GetVersion() int32 { + if x != nil { + return x.Version + } + return 0 +} + +type GetSchemaResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Schema *v1.Schema `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"` +} + +func (x *GetSchemaResponse) Reset() { + *x = GetSchemaResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_conduit_v1_schema_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetSchemaResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetSchemaResponse) ProtoMessage() {} + +func (x *GetSchemaResponse) ProtoReflect() protoreflect.Message { + mi := &file_conduit_v1_schema_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetSchemaResponse.ProtoReflect.Descriptor instead. +func (*GetSchemaResponse) Descriptor() ([]byte, []int) { + return file_conduit_v1_schema_proto_rawDescGZIP(), []int{3} +} + +func (x *GetSchemaResponse) GetSchema() *v1.Schema { + if x != nil { + return x.Schema + } + return nil +} + +var File_conduit_v1_schema_proto protoreflect.FileDescriptor + +var file_conduit_v1_schema_proto_rawDesc = []byte{ + 0x0a, 0x17, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x75, + 0x69, 0x74, 0x2e, 0x76, 0x31, 0x1a, 0x16, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2f, 0x76, 0x31, + 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x71, 0x0a, + 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x2a, + 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x16, 0x2e, 0x73, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, + 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x79, + 0x74, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x62, 0x79, 0x74, 0x65, 0x73, + 0x22, 0x41, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, + 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x06, 0x73, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x22, 0x46, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, + 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, + 0x74, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3e, 0x0a, 0x11, 0x47, + 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x29, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x11, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x42, 0xad, 0x01, 0x0a, 0x0e, + 0x63, 0x6f, 0x6d, 0x2e, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x42, 0x0b, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x45, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, + 0x74, 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2d, 0x70, 0x72, 0x6f, 0x63, + 0x65, 0x73, 0x73, 0x6f, 0x72, 0x2d, 0x73, 0x64, 0x6b, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2f, 0x76, 0x31, 0x3b, 0x63, 0x6f, 0x6e, 0x64, 0x75, + 0x69, 0x74, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x43, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x43, 0x6f, 0x6e, + 0x64, 0x75, 0x69, 0x74, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0a, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, + 0x74, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x5c, 0x56, + 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, + 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_conduit_v1_schema_proto_rawDescOnce sync.Once + file_conduit_v1_schema_proto_rawDescData = file_conduit_v1_schema_proto_rawDesc +) + +func file_conduit_v1_schema_proto_rawDescGZIP() []byte { + file_conduit_v1_schema_proto_rawDescOnce.Do(func() { + file_conduit_v1_schema_proto_rawDescData = protoimpl.X.CompressGZIP(file_conduit_v1_schema_proto_rawDescData) + }) + return file_conduit_v1_schema_proto_rawDescData +} + +var file_conduit_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_conduit_v1_schema_proto_goTypes = []interface{}{ + (*CreateSchemaRequest)(nil), // 0: conduit.v1.CreateSchemaRequest + (*CreateSchemaResponse)(nil), // 1: conduit.v1.CreateSchemaResponse + (*GetSchemaRequest)(nil), // 2: conduit.v1.GetSchemaRequest + (*GetSchemaResponse)(nil), // 3: conduit.v1.GetSchemaResponse + (v1.Schema_Type)(0), // 4: schema.v1.Schema.Type + (*v1.Schema)(nil), // 5: schema.v1.Schema +} +var file_conduit_v1_schema_proto_depIdxs = []int32{ + 4, // 0: conduit.v1.CreateSchemaRequest.type:type_name -> schema.v1.Schema.Type + 5, // 1: conduit.v1.CreateSchemaResponse.schema:type_name -> schema.v1.Schema + 5, // 2: conduit.v1.GetSchemaResponse.schema:type_name -> schema.v1.Schema + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_conduit_v1_schema_proto_init() } +func file_conduit_v1_schema_proto_init() { + if File_conduit_v1_schema_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_conduit_v1_schema_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateSchemaRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_conduit_v1_schema_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateSchemaResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_conduit_v1_schema_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetSchemaRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_conduit_v1_schema_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetSchemaResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_conduit_v1_schema_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_conduit_v1_schema_proto_goTypes, + DependencyIndexes: file_conduit_v1_schema_proto_depIdxs, + MessageInfos: file_conduit_v1_schema_proto_msgTypes, + }.Build() + File_conduit_v1_schema_proto = out.File + file_conduit_v1_schema_proto_rawDesc = nil + file_conduit_v1_schema_proto_goTypes = nil + file_conduit_v1_schema_proto_depIdxs = nil +} diff --git a/proto/conduit/v1/schema.proto b/proto/conduit/v1/schema.proto new file mode 100644 index 0000000..69701c0 --- /dev/null +++ b/proto/conduit/v1/schema.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package conduit.v1; + +import "schema/v1/schema.proto"; + +option go_package = "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1"; + +message CreateSchemaRequest { + string subject = 1; + schema.v1.Schema.Type type = 2; + bytes bytes = 3; +} +message CreateSchemaResponse { + schema.v1.Schema schema = 1; +} + +message GetSchemaRequest { + string subject = 1; + int32 version = 2; +} +message GetSchemaResponse { + schema.v1.Schema schema = 1; +} diff --git a/proto/processor/v1/processor.proto b/proto/processor/v1/processor.proto index 68dd070..ae49c59 100644 --- a/proto/processor/v1/processor.proto +++ b/proto/processor/v1/processor.proto @@ -92,4 +92,4 @@ message Teardown { message Error { uint32 code = 1; string message = 2; -} \ No newline at end of file +} diff --git a/run.go b/run.go index 786e623..c014d47 100644 --- a/run.go +++ b/run.go @@ -25,7 +25,7 @@ import ( "github.com/conduitio/conduit-commons/opencdc" configv1 "github.com/conduitio/conduit-commons/proto/config/v1" opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" - "github.com/conduitio/conduit-processor-sdk/internal" + "github.com/conduitio/conduit-processor-sdk/pconduit" processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" "github.com/conduitio/conduit-processor-sdk/wasm" "github.com/rs/zerolog" @@ -63,14 +63,12 @@ func Run(p Processor) { logLevel: os.Getenv("CONDUIT_LOG_LEVEL"), } - ctx = internal.ContextWithUtil( - context.Background(), - wasm.NewUtil(env.logLevel), - ) - + ctx = context.Background() // TODO: add processor ID to context cmd processorv1.CommandRequest ) + wasm.InitUtils(env.logLevel) + logger := Logger(ctx) executor := commandExecutor{ protoconv: protoConverter{}, @@ -82,7 +80,7 @@ func Run(p Processor) { cmd.Reset() err := wasm.NextCommand(&cmd) if err != nil { - if errors.Is(err, wasm.ErrNoMoreCommands) { + if errors.Is(err, pconduit.ErrNoMoreCommands) { os.Exit(0) } _, _ = fmt.Fprintf(os.Stderr, "failed retrieving next command: %v", err) @@ -142,7 +140,7 @@ func (e commandExecutor) Execute(ctx context.Context, p Processor, cmdReq *proce case *processorv1.CommandRequest_Teardown: resp, err = e.executeTeardown(ctx, p, req.Teardown) default: - err = wasm.ErrUnknownCommandRequest + err = pconduit.ErrUnknownCommandRequest } if err != nil { @@ -316,7 +314,7 @@ func (c protoConverter) errorRecord(in ErrorRecord) (*processorv1.Process_Proces } func (c protoConverter) error(err error) *processorv1.Error { - var wasmErr *wasm.Error + var wasmErr *pconduit.Error var code uint32 if errors.As(err, &wasmErr) { code = wasmErr.ErrCode diff --git a/schema/schema.go b/schema/schema.go new file mode 100644 index 0000000..d68087b --- /dev/null +++ b/schema/schema.go @@ -0,0 +1,55 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "context" + "fmt" + + "github.com/conduitio/conduit-commons/schema" + "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pconduit/global" +) + +const TypeAvro = schema.TypeAvro + +var ( + ErrSubjectNotFound = pconduit.ErrSubjectNotFound + ErrVersionNotFound = pconduit.ErrVersionNotFound + ErrInvalidSchema = pconduit.ErrInvalidSchema +) + +func Get(ctx context.Context, subject string, version int) (schema.Schema, error) { + resp, err := global.SchemaService.GetSchema(ctx, pconduit.GetSchemaRequest{ + Subject: subject, + Version: version, + }) + if err != nil { + return schema.Schema{}, fmt.Errorf("error getting schema: %w", err) + } + return resp.Schema, nil +} + +func Create(ctx context.Context, typ schema.Type, subject string, bytes []byte) (schema.Schema, error) { + resp, err := global.SchemaService.CreateSchema(ctx, pconduit.CreateSchemaRequest{ + Subject: subject, + Type: typ, + Bytes: bytes, + }) + if err != nil { + return schema.Schema{}, fmt.Errorf("error creating schema: %w", err) + } + return resp.Schema, nil +} diff --git a/util.go b/util.go index 040009a..ece6642 100644 --- a/util.go +++ b/util.go @@ -20,16 +20,17 @@ import ( "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" - "github.com/conduitio/conduit-processor-sdk/internal" "github.com/conduitio/conduit-processor-sdk/internal/reference" + "github.com/conduitio/conduit-processor-sdk/pconduit/global" "github.com/rs/zerolog" ) -// Logger returns the logger for the current context. Please provide the context -// that is passed to any of the processor's methods (Configure, Open, Process, -// Teardown). +// Logger returns the logger for the processor. Please provide the context that +// is passed to any of the processor's methods (Configure, Open, Process, +// Teardown) to ensure that the log messages include contextual information. func Logger(ctx context.Context) *zerolog.Logger { - return internal.UtilFromContext(ctx).Logger(ctx) + l := global.Logger.With().Ctx(ctx).Logger() + return &l } // Reference is an interface that represents a reference to a field in a record. diff --git a/wasm/caller.go b/wasm/caller.go new file mode 100644 index 0000000..f553c57 --- /dev/null +++ b/wasm/caller.go @@ -0,0 +1,57 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build wasm + +package wasm + +import ( + "unsafe" + + "github.com/conduitio/conduit-processor-sdk/pconduit" +) + +// HostFunc is the function type for the imported functions from the host. +// +// The arguments are: +// (1) a pointer to the address where the command should be written +// (2) the size of allocated memory. +// +// The return value indicates the size of the allocated response in bytes. If the +// response is larger than the allocated memory, the caller should reallocate the +// memory and call the function again. +type HostFunc func(ptr unsafe.Pointer, size uint32) uint32 + +// Call calls the function from the host 2 times max, is the buffer size is not +// enough the first time its called, it will be resized the second call. +// returns the buffer, command size, and error. +func hostCall(fn HostFunc, buf []byte) ([]byte, uint32, error) { + // 2 tries, 1st try is with the current buffer size, if that's not enough, + // then resize the buffer and try again + for i := 0; i < 2; i++ { + // request the host to write the response to the given buffer address + ptr := unsafe.Pointer(&buf[0]) + cmdSize := fn(ptr, uint32(len(buf))) + switch { + case cmdSize >= pconduit.ErrorCodeStart: // error codes + return nil, cmdSize, pconduit.NewErrorFromCode(cmdSize) + case cmdSize > uint32(len(buf)) && i == 0: // not enough memory + oldSize := uint32(len(buf)) + buf = append(buf, make([]byte, cmdSize-oldSize)...) + continue // try again + } + return buf, cmdSize, nil + } + panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") +} diff --git a/wasm/command.go b/wasm/command.go index b40d0f3..13f7f68 100644 --- a/wasm/command.go +++ b/wasm/command.go @@ -18,54 +18,43 @@ package wasm import ( "fmt" - "unsafe" + "sync" processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" "google.golang.org/protobuf/proto" ) -const defaultCommandSize = 1024 // 1kB +const defaultBufferSize = 1024 // 1kB -var buffer = make([]byte, defaultCommandSize) +var bufferPool = sync.Pool{ + New: func() any { + return make([]byte, defaultBufferSize) + }, +} -// NextCommand retrieves the next command from Conduit. func NextCommand(cmdReq *processorv1.CommandRequest) error { - // 2 tries, 1st try is with the current buffer size, if that's not enough, - // then resize the buffer and try again - for i := 0; i < 2; i++ { - // request Conduit to write the command to the given buffer - ptr := unsafe.Pointer(&buffer[0]) - cmdSize := _commandRequest(ptr, uint32(cap(buffer))) - - switch { - case cmdSize >= ErrorCodeStart: // error codes - return NewErrorFromCode(cmdSize) - case cmdSize > uint32(cap(buffer)): // not enough memory - buffer = make([]byte, cmdSize) // resize buffer - continue // try again - } + buffer := bufferPool.Get().([]byte) + defer bufferPool.Put(buffer) - // parse the command - if err := proto.Unmarshal(buffer[:cmdSize], cmdReq); err != nil { - return fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) - } - return nil + buffer, cmdSize, err := hostCall(_commandRequest, buffer[:cap(buffer)]) + if err != nil { + return err } - panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") + // parse the command + if err := proto.Unmarshal(buffer[:cmdSize], cmdReq); err != nil { + return fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) + } + return nil } func Reply(resp *processorv1.CommandResponse) error { - var err error - buffer, err = proto.MarshalOptions{}.MarshalAppend(buffer[:0], resp) + buffer := bufferPool.Get().([]byte) + defer bufferPool.Put(buffer) + + buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], resp) if err != nil { return fmt.Errorf("failed marshalling proto type into bytes: %w", err) } - - ptr := unsafe.Pointer(&buffer[0]) - errCode := _commandResponse(ptr, uint32(len(buffer))) - if errCode != 0 { - return NewErrorFromCode(errCode) - } - - return nil + _, _, err = hostCall(_commandResponse, buffer) + return err } diff --git a/wasm/imports.go b/wasm/imports.go index 4b8528c..3f25b95 100644 --- a/wasm/imports.go +++ b/wasm/imports.go @@ -41,3 +41,29 @@ func _commandRequest(ptr unsafe.Pointer, size uint32) uint32 // //go:wasmimport conduit command_response func _commandResponse(ptr unsafe.Pointer, size uint32) uint32 + +// Imports `create_schema` from the host, which creates a schema +// +// The arguments are: +// (1) a pointer to the address where the response should be written +// (2) the size of allocated memory. +// +// The return value indicates the size of the allocated request in bytes. If the +// command is larger than the allocated memory, the caller should reallocate the +// memory and call `create_schema` again. +// +//go:wasmimport conduit create_schema +func _createSchema(ptr unsafe.Pointer, size uint32) uint32 + +// Imports `get_schema` from the host, which gets a schema +// +// The arguments are: +// (1) a pointer to the address where the response should be written +// (2) the size of allocated memory. +// +// The return value indicates the size of the allocated request in bytes. If the +// command is larger than the allocated memory, the caller should reallocate the +// memory and call `get_schema` again. +// +//go:wasmimport conduit get_schema +func _getSchema(ptr unsafe.Pointer, size uint32) uint32 diff --git a/wasm/schema.go b/wasm/schema.go new file mode 100644 index 0000000..987ed72 --- /dev/null +++ b/wasm/schema.go @@ -0,0 +1,80 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build wasm + +package wasm + +import ( + "context" + "fmt" + + "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pconduit/v1/fromproto" + "github.com/conduitio/conduit-processor-sdk/pconduit/v1/toproto" + conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1" + "google.golang.org/protobuf/proto" +) + +type schemaService struct{} + +func (*schemaService) CreateSchema(_ context.Context, req pconduit.CreateSchemaRequest) (pconduit.CreateSchemaResponse, error) { + protoReq := toproto.CreateSchemaRequest(req) + + buffer := bufferPool.Get().([]byte) + defer bufferPool.Put(buffer) + + buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], protoReq) + if err != nil { + return pconduit.CreateSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) + } + + buffer, cmdSize, err := hostCall(_createSchema, buffer) + if err != nil { + return pconduit.CreateSchemaResponse{}, fmt.Errorf("error calling createSchema: %w", err) + } + + var resp conduitv1.CreateSchemaResponse + err = proto.Unmarshal(buffer[:cmdSize], &resp) + if err != nil { + return pconduit.CreateSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) + } + + return fromproto.CreateSchemaResponse(&resp), nil + +} + +func (*schemaService) GetSchema(_ context.Context, req pconduit.GetSchemaRequest) (pconduit.GetSchemaResponse, error) { + protoReq := toproto.GetSchemaRequest(req) + + buffer := bufferPool.Get().([]byte) + defer bufferPool.Put(buffer) + + buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], protoReq) + if err != nil { + return pconduit.GetSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) + } + + buffer, cmdSize, err := hostCall(_getSchema, buffer) + if err != nil { + return pconduit.GetSchemaResponse{}, fmt.Errorf("error calling getSchema: %w", err) + } + + var resp conduitv1.GetSchemaResponse + err = proto.Unmarshal(buffer[:cmdSize], &resp) + if err != nil { + return pconduit.GetSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) + } + return fromproto.GetSchemaResponse(&resp), nil +} diff --git a/wasm/util.go b/wasm/util.go index bdf9aa2..ddf8e41 100644 --- a/wasm/util.go +++ b/wasm/util.go @@ -17,23 +17,18 @@ package wasm import ( - "context" "os" + "github.com/conduitio/conduit-processor-sdk/pconduit/global" "github.com/rs/zerolog" ) -type Util struct { - logger zerolog.Logger +func InitUtils(logLevel string) { + initLogger(logLevel) + initSchemaService() } -func NewUtil(logLevel string) Util { - return Util{ - logger: initLogger(logLevel), - } -} - -func initLogger(logLevel string) zerolog.Logger { +func initLogger(logLevel string) { logger := zerolog.New(os.Stdout) level, err := zerolog.ParseLevel(logLevel) @@ -43,10 +38,9 @@ func initLogger(logLevel string) zerolog.Logger { level = zerolog.DebugLevel } logger = logger.Level(level) - return logger + global.Logger = logger } -func (u Util) Logger(ctx context.Context) *zerolog.Logger { - l := u.logger.With().Ctx(ctx).Logger() - return &l +func initSchemaService() { + global.SchemaService = &schemaService{} }