Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add createSchema and getSchema util methods #60

Merged
merged 21 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.21.5

require (
github.com/bufbuild/buf v1.34.0
github.com/conduitio/conduit-commons v0.2.0
github.com/conduitio/conduit-commons v0.2.1-0.20240701165122-3948a38f3667
github.com/goccy/go-json v0.10.3
github.com/golangci/golangci-lint v1.59.1
github.com/google/go-cmp v0.6.0
Expand Down Expand Up @@ -118,6 +118,7 @@ require (
github.com/gostaticanalysis/comment v1.4.2 // indirect
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
github.com/hamba/avro/v2 v2.22.1 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
Expand All @@ -127,6 +128,7 @@ require (
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/jjti/go-spancheck v0.6.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/julz/importas v0.1.0 // indirect
github.com/karamaru-alpha/copyloopvar v1.1.0 // indirect
github.com/kisielk/errcheck v1.7.0 // indirect
Expand Down Expand Up @@ -155,6 +157,8 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/moricho/tparallel v0.3.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/nakabonne/nestif v0.3.1 // indirect
Expand Down
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ github.com/ckaznocha/intrange v0.1.2 h1:3Y4JAxcMntgb/wABQ6e8Q8leMd26JbX2790lIss9
github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8VhJZEEA5n+RE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/conduitio/conduit-commons v0.2.0 h1:TMpVGXi0Wski537qLAyQWdGjuGHEhaZxOS5L90pZJSQ=
github.com/conduitio/conduit-commons v0.2.0/go.mod h1:i7Q2jm7FBSi2zj1/4MCsFD1hIKAbvamlNtSQfkhUTiY=
github.com/conduitio/conduit-commons v0.2.1-0.20240701165122-3948a38f3667 h1:3HUOjmhoFGwanbq8GOqbCVCKzQom31s9EILVAGGW1R4=
github.com/conduitio/conduit-commons v0.2.1-0.20240701165122-3948a38f3667/go.mod h1:S7zRUQc8goT3gFRbS6jLHV9SxzAeiJ/JKe7FxMX5B1U=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU=
Expand Down Expand Up @@ -368,8 +368,10 @@ github.com/gostaticanalysis/nilerr v0.1.1/go.mod h1:wZYb6YI5YAxxq0i1+VJbY0s2YONW
github.com/gostaticanalysis/testutil v0.3.1-0.20210208050101-bfb5c8eec0e4/go.mod h1:D+FIZ+7OahH3ePw/izIEeH5I06eKs1IKI4Xr64/Am3M=
github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoISdUv3PPQgHY=
github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/hamba/avro/v2 v2.22.1 h1:q1rAbfJsrbMaZPDLQvwUQMfQzp6H+hGXvckmU/lXemk=
github.com/hamba/avro/v2 v2.22.1/go.mod h1:HOeTrE3kvWnBAgsufqhAzDDV5gvS0QXs65Z6BHfGgbg=
github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
Expand Down Expand Up @@ -401,6 +403,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
Expand Down Expand Up @@ -482,9 +485,11 @@ github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6U
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/moricho/tparallel v0.3.1 h1:fQKD4U1wRMAYNngDonW5XupoB/ZGJHdpzrWqgyg9krA=
github.com/moricho/tparallel v0.3.1/go.mod h1:leENX2cUv7Sv2qDgdi0D0fCftN8fRC67Bcn8pqzeYNI=
Expand Down
51 changes: 0 additions & 51 deletions internal/util.go

This file was deleted.

18 changes: 3 additions & 15 deletions conduit/util.go → pconduit/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package conduit provides the functionality for Conduit to set up and run
// built-in processors. DO NOT use this package directly.
package conduit

import (
"context"

"github.com/conduitio/conduit-processor-sdk/internal"
)

// ContextWithUtil allows Conduit to set the Util interface for built-in
// processors. DO NOT use this function in your processor.
func ContextWithUtil(ctx context.Context, util internal.Util) context.Context {
return internal.ContextWithUtil(ctx, util)
}
// Package pconduit provides the functionality for Conduit to set up utilities
// for processors. DO NOT use this package directly.
package pconduit
31 changes: 16 additions & 15 deletions wasm/errors.go → pconduit/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package wasm
package pconduit

import (
"errors"
"math"
)

const (
// ErrorCodeStart is the smallest error code which the host (i.e. Conduit) can send.
// The imported function _nextCommand returns an uint32 value
// that is either the number of bytes actually written or an error code.
// Because of that, we're reserving a range of error codes.
// ErrorCodeStart is the smallest error code which the wasm package can send.
ErrorCodeStart = math.MaxUint32 - 100

ErrorCodeNoMoreCommands = math.MaxUint32 - iota
ErrorCodeUnknownCommandRequest
ErrorCodeUnknownCommandResponse
ErrorCodeMemoryOutOfRange

ErrorCodeSubjectNotFound
ErrorCodeVersionNotFound
ErrorCodeInvalidSchema
)

var (
ErrNoMoreCommands = NewError(ErrorCodeNoMoreCommands, "no more commands")
ErrUnknownCommandRequest = NewError(ErrorCodeUnknownCommandRequest, "unknown command request")
ErrUnknownCommandResponse = NewError(ErrorCodeUnknownCommandResponse, "unknown command response")
ErrMemoryOutOfRange = NewError(ErrorCodeMemoryOutOfRange, "memory out of range")

ErrSubjectNotFound = NewError(ErrorCodeSubjectNotFound, "schema subject not found")
ErrVersionNotFound = NewError(ErrorCodeVersionNotFound, "schema version not found")
ErrInvalidSchema = NewError(ErrorCodeInvalidSchema, "invalid schema")
)

// Error is an error sent to or received from the host (i.e. Conduit).
type Error struct {
ErrCode uint32
Message string
Expand Down Expand Up @@ -74,15 +77,13 @@ func NewErrorFromCode(code uint32) *Error {
return ErrUnknownCommandResponse
case ErrorCodeMemoryOutOfRange:
return ErrMemoryOutOfRange
case ErrorCodeSubjectNotFound:
return ErrSubjectNotFound
case ErrorCodeVersionNotFound:
return ErrVersionNotFound
case ErrorCodeInvalidSchema:
return ErrInvalidSchema
default:
return NewError(code, "unknown error code")
}
}

func CodeFromError(err error) uint32 {
var wasmErr *Error
if errors.As(err, &wasmErr) {
return wasmErr.ErrCode
}
return 0
}
36 changes: 36 additions & 0 deletions pconduit/global/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package global provides the functionality for Conduit to set up utilities
// for processors. DO NOT use this package directly.
package global

import (
"os"

"github.com/conduitio/conduit-processor-sdk/pconduit"
"github.com/rs/zerolog"
)

var (
// Logger is the logger for the processor. DO NOT use this logger directly,
// instead use the Logger() function in the root of the processor SDK.
Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).
With().
Timestamp().
Logger()

// TODO by default set to an in-memory schema service.
SchemaService pconduit.SchemaService
)
43 changes: 43 additions & 0 deletions pconduit/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pconduit

import (
"context"

"github.com/conduitio/conduit-commons/schema"
)

type CreateSchemaRequest struct {
Subject string
Type schema.Type
Bytes []byte
}
type CreateSchemaResponse struct {
Schema schema.Schema
}

type GetSchemaRequest struct {
Subject string
Version int
}
type GetSchemaResponse struct {
Schema schema.Schema
}

type SchemaService interface {
CreateSchema(context.Context, CreateSchemaRequest) (CreateSchemaResponse, error)
GetSchema(context.Context, GetSchemaRequest) (GetSchemaResponse, error)
}
51 changes: 51 additions & 0 deletions pconduit/v1/fromproto/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fromproto

import (
"github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-processor-sdk/pconduit"
conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1"
)

func CreateSchemaRequest(req *conduitv1.CreateSchemaRequest) pconduit.CreateSchemaRequest {
return pconduit.CreateSchemaRequest{
Subject: req.Subject,
Type: schema.Type(req.Type),
Bytes: req.Bytes,
}
}

func CreateSchemaResponse(resp *conduitv1.CreateSchemaResponse) pconduit.CreateSchemaResponse {
return pconduit.CreateSchemaResponse{
Schema: schema.Schema{
Subject: resp.Schema.Subject,
Version: int(resp.Schema.Version),
Type: schema.Type(resp.Schema.Type),
Bytes: resp.Schema.Bytes,
},
}
}

func GetSchemaResponse(resp *conduitv1.GetSchemaResponse) pconduit.GetSchemaResponse {
return pconduit.GetSchemaResponse{
Schema: schema.Schema{
Subject: resp.Schema.Subject,
Version: int(resp.Schema.Version),
Type: schema.Type(resp.Schema.Type),
Bytes: resp.Schema.Bytes,
},
}
}
Loading
Loading