diff --git a/Makefile b/Makefile index 9ca7798..286d1e7 100644 --- a/Makefile +++ b/Makefile @@ -26,3 +26,7 @@ generate: .PHONY: proto-generate proto-generate: cd proto && buf generate + +.PHONY: proto-lint +proto-lint: + cd proto && buf lint \ No newline at end of file diff --git a/pconduit/doc.go b/pprocutils/doc.go similarity index 86% rename from pconduit/doc.go rename to pprocutils/doc.go index 5056dd1..d5f5d84 100644 --- a/pconduit/doc.go +++ b/pprocutils/doc.go @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package pconduit provides the functionality for Conduit to set up utilities +// Package pprocutils provides the functionality for Conduit to set up utilities // for processors. DO NOT use this package directly. -package pconduit +package pprocutils diff --git a/pconduit/errors.go b/pprocutils/errors.go similarity index 99% rename from pconduit/errors.go rename to pprocutils/errors.go index f7c84f8..e2650fd 100644 --- a/pconduit/errors.go +++ b/pprocutils/errors.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pconduit +package pprocutils import ( "math" diff --git a/pconduit/logger.go b/pprocutils/logger.go similarity index 97% rename from pconduit/logger.go rename to pprocutils/logger.go index b4b6c2d..8635c5b 100644 --- a/pconduit/logger.go +++ b/pprocutils/logger.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pconduit +package pprocutils import ( "os" diff --git a/pconduit/schema.go b/pprocutils/schema.go similarity index 98% rename from pconduit/schema.go rename to pprocutils/schema.go index 9a6d7fc..24ddf82 100644 --- a/pconduit/schema.go +++ b/pprocutils/schema.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pconduit +package pprocutils import ( "context" diff --git a/pconduit/v1/fromproto/schema.go b/pprocutils/v1/fromproto/schema.go similarity index 63% rename from pconduit/v1/fromproto/schema.go rename to pprocutils/v1/fromproto/schema.go index e1c9e33..f57b0cc 100644 --- a/pconduit/v1/fromproto/schema.go +++ b/pprocutils/v1/fromproto/schema.go @@ -16,27 +16,27 @@ package fromproto import ( "github.com/conduitio/conduit-commons/schema" - "github.com/conduitio/conduit-processor-sdk/pconduit" - conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1" + "github.com/conduitio/conduit-processor-sdk/pprocutils" + procutilsv1 "github.com/conduitio/conduit-processor-sdk/proto/procutils/v1" ) -func CreateSchemaRequest(req *conduitv1.CreateSchemaRequest) pconduit.CreateSchemaRequest { - return pconduit.CreateSchemaRequest{ +func CreateSchemaRequest(req *procutilsv1.CreateSchemaRequest) pprocutils.CreateSchemaRequest { + return pprocutils.CreateSchemaRequest{ Subject: req.Subject, Type: schema.Type(req.Type), Bytes: req.Bytes, } } -func GetSchemaRequest(req *conduitv1.GetSchemaRequest) pconduit.GetSchemaRequest { - return pconduit.GetSchemaRequest{ +func GetSchemaRequest(req *procutilsv1.GetSchemaRequest) pprocutils.GetSchemaRequest { + return pprocutils.GetSchemaRequest{ Subject: req.Subject, Version: int(req.Version), } } -func CreateSchemaResponse(resp *conduitv1.CreateSchemaResponse) pconduit.CreateSchemaResponse { - return pconduit.CreateSchemaResponse{ +func CreateSchemaResponse(resp *procutilsv1.CreateSchemaResponse) pprocutils.CreateSchemaResponse { + return pprocutils.CreateSchemaResponse{ Schema: schema.Schema{ Subject: resp.Schema.Subject, Version: int(resp.Schema.Version), @@ -46,8 +46,8 @@ func CreateSchemaResponse(resp *conduitv1.CreateSchemaResponse) pconduit.CreateS } } -func GetSchemaResponse(resp *conduitv1.GetSchemaResponse) pconduit.GetSchemaResponse { - return pconduit.GetSchemaResponse{ +func GetSchemaResponse(resp *procutilsv1.GetSchemaResponse) pprocutils.GetSchemaResponse { + return pprocutils.GetSchemaResponse{ Schema: schema.Schema{ Subject: resp.Schema.Subject, Version: int(resp.Schema.Version), diff --git a/pconduit/v1/toproto/schema.go b/pprocutils/v1/toproto/schema.go similarity index 64% rename from pconduit/v1/toproto/schema.go rename to pprocutils/v1/toproto/schema.go index e03965a..63be630 100644 --- a/pconduit/v1/toproto/schema.go +++ b/pprocutils/v1/toproto/schema.go @@ -16,19 +16,19 @@ package toproto import ( schemav1 "github.com/conduitio/conduit-commons/proto/schema/v1" - "github.com/conduitio/conduit-processor-sdk/pconduit" - conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1" + "github.com/conduitio/conduit-processor-sdk/pprocutils" + procutilsv1 "github.com/conduitio/conduit-processor-sdk/proto/procutils/v1" ) -func GetSchemaRequest(in pconduit.GetSchemaRequest) *conduitv1.GetSchemaRequest { - return &conduitv1.GetSchemaRequest{ +func GetSchemaRequest(in pprocutils.GetSchemaRequest) *procutilsv1.GetSchemaRequest { + return &procutilsv1.GetSchemaRequest{ Subject: in.Subject, Version: int32(in.Version), } } -func GetSchemaResponse(in pconduit.GetSchemaResponse) *conduitv1.GetSchemaResponse { - return &conduitv1.GetSchemaResponse{ +func GetSchemaResponse(in pprocutils.GetSchemaResponse) *procutilsv1.GetSchemaResponse { + return &procutilsv1.GetSchemaResponse{ Schema: &schemav1.Schema{ Subject: in.Schema.Subject, Version: int32(in.Schema.Version), @@ -38,16 +38,16 @@ func GetSchemaResponse(in pconduit.GetSchemaResponse) *conduitv1.GetSchemaRespon } } -func CreateSchemaRequest(in pconduit.CreateSchemaRequest) *conduitv1.CreateSchemaRequest { - return &conduitv1.CreateSchemaRequest{ +func CreateSchemaRequest(in pprocutils.CreateSchemaRequest) *procutilsv1.CreateSchemaRequest { + return &procutilsv1.CreateSchemaRequest{ Subject: in.Subject, Type: schemav1.Schema_Type(in.Type), Bytes: in.Bytes, } } -func CreateSchemaResponse(in pconduit.CreateSchemaResponse) *conduitv1.CreateSchemaResponse { - return &conduitv1.CreateSchemaResponse{ +func CreateSchemaResponse(in pprocutils.CreateSchemaResponse) *procutilsv1.CreateSchemaResponse { + return &procutilsv1.CreateSchemaResponse{ Schema: &schemav1.Schema{ Subject: in.Schema.Subject, Version: int32(in.Schema.Version), diff --git a/proto/conduit/v1/schema.pb.go b/proto/procutils/v1/schema.pb.go similarity index 53% rename from proto/conduit/v1/schema.pb.go rename to proto/procutils/v1/schema.pb.go index af03fec..a32ae87 100644 --- a/proto/conduit/v1/schema.pb.go +++ b/proto/procutils/v1/schema.pb.go @@ -2,9 +2,9 @@ // versions: // protoc-gen-go v1.32.0 // protoc (unknown) -// source: conduit/v1/schema.proto +// source: procutils/v1/schema.proto -package conduitv1 +package procutilsv1 import ( v1 "github.com/conduitio/conduit-commons/proto/schema/v1" @@ -34,7 +34,7 @@ type CreateSchemaRequest struct { func (x *CreateSchemaRequest) Reset() { *x = CreateSchemaRequest{} if protoimpl.UnsafeEnabled { - mi := &file_conduit_v1_schema_proto_msgTypes[0] + mi := &file_procutils_v1_schema_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -47,7 +47,7 @@ func (x *CreateSchemaRequest) String() string { func (*CreateSchemaRequest) ProtoMessage() {} func (x *CreateSchemaRequest) ProtoReflect() protoreflect.Message { - mi := &file_conduit_v1_schema_proto_msgTypes[0] + mi := &file_procutils_v1_schema_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -60,7 +60,7 @@ func (x *CreateSchemaRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CreateSchemaRequest.ProtoReflect.Descriptor instead. func (*CreateSchemaRequest) Descriptor() ([]byte, []int) { - return file_conduit_v1_schema_proto_rawDescGZIP(), []int{0} + return file_procutils_v1_schema_proto_rawDescGZIP(), []int{0} } func (x *CreateSchemaRequest) GetSubject() string { @@ -95,7 +95,7 @@ type CreateSchemaResponse struct { func (x *CreateSchemaResponse) Reset() { *x = CreateSchemaResponse{} if protoimpl.UnsafeEnabled { - mi := &file_conduit_v1_schema_proto_msgTypes[1] + mi := &file_procutils_v1_schema_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -108,7 +108,7 @@ func (x *CreateSchemaResponse) String() string { func (*CreateSchemaResponse) ProtoMessage() {} func (x *CreateSchemaResponse) ProtoReflect() protoreflect.Message { - mi := &file_conduit_v1_schema_proto_msgTypes[1] + mi := &file_procutils_v1_schema_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -121,7 +121,7 @@ func (x *CreateSchemaResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CreateSchemaResponse.ProtoReflect.Descriptor instead. func (*CreateSchemaResponse) Descriptor() ([]byte, []int) { - return file_conduit_v1_schema_proto_rawDescGZIP(), []int{1} + return file_procutils_v1_schema_proto_rawDescGZIP(), []int{1} } func (x *CreateSchemaResponse) GetSchema() *v1.Schema { @@ -143,7 +143,7 @@ type GetSchemaRequest struct { func (x *GetSchemaRequest) Reset() { *x = GetSchemaRequest{} if protoimpl.UnsafeEnabled { - mi := &file_conduit_v1_schema_proto_msgTypes[2] + mi := &file_procutils_v1_schema_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -156,7 +156,7 @@ func (x *GetSchemaRequest) String() string { func (*GetSchemaRequest) ProtoMessage() {} func (x *GetSchemaRequest) ProtoReflect() protoreflect.Message { - mi := &file_conduit_v1_schema_proto_msgTypes[2] + mi := &file_procutils_v1_schema_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -169,7 +169,7 @@ func (x *GetSchemaRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetSchemaRequest.ProtoReflect.Descriptor instead. func (*GetSchemaRequest) Descriptor() ([]byte, []int) { - return file_conduit_v1_schema_proto_rawDescGZIP(), []int{2} + return file_procutils_v1_schema_proto_rawDescGZIP(), []int{2} } func (x *GetSchemaRequest) GetSubject() string { @@ -197,7 +197,7 @@ type GetSchemaResponse struct { func (x *GetSchemaResponse) Reset() { *x = GetSchemaResponse{} if protoimpl.UnsafeEnabled { - mi := &file_conduit_v1_schema_proto_msgTypes[3] + mi := &file_procutils_v1_schema_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -210,7 +210,7 @@ func (x *GetSchemaResponse) String() string { func (*GetSchemaResponse) ProtoMessage() {} func (x *GetSchemaResponse) ProtoReflect() protoreflect.Message { - mi := &file_conduit_v1_schema_proto_msgTypes[3] + mi := &file_procutils_v1_schema_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -223,7 +223,7 @@ func (x *GetSchemaResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetSchemaResponse.ProtoReflect.Descriptor instead. func (*GetSchemaResponse) Descriptor() ([]byte, []int) { - return file_conduit_v1_schema_proto_rawDescGZIP(), []int{3} + return file_procutils_v1_schema_proto_rawDescGZIP(), []int{3} } func (x *GetSchemaResponse) GetSchema() *v1.Schema { @@ -233,72 +233,73 @@ func (x *GetSchemaResponse) GetSchema() *v1.Schema { return nil } -var File_conduit_v1_schema_proto protoreflect.FileDescriptor - -var file_conduit_v1_schema_proto_rawDesc = []byte{ - 0x0a, 0x17, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x63, 0x68, - 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x75, - 0x69, 0x74, 0x2e, 0x76, 0x31, 0x1a, 0x16, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2f, 0x76, 0x31, - 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x71, 0x0a, - 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x2a, - 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x16, 0x2e, 0x73, - 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x79, - 0x74, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x62, 0x79, 0x74, 0x65, 0x73, - 0x22, 0x41, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, - 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, - 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x06, 0x73, 0x63, 0x68, - 0x65, 0x6d, 0x61, 0x22, 0x46, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, - 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, - 0x74, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3e, 0x0a, 0x11, 0x47, - 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x29, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x11, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, - 0x65, 0x6d, 0x61, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x42, 0xad, 0x01, 0x0a, 0x0e, - 0x63, 0x6f, 0x6d, 0x2e, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x42, 0x0b, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x45, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, - 0x74, 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2d, 0x70, 0x72, 0x6f, 0x63, - 0x65, 0x73, 0x73, 0x6f, 0x72, 0x2d, 0x73, 0x64, 0x6b, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2f, 0x76, 0x31, 0x3b, 0x63, 0x6f, 0x6e, 0x64, 0x75, - 0x69, 0x74, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x43, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x43, 0x6f, 0x6e, - 0x64, 0x75, 0x69, 0x74, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0a, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, - 0x74, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x5c, 0x56, - 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, - 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, +var File_procutils_v1_schema_proto protoreflect.FileDescriptor + +var file_procutils_v1_schema_proto_rawDesc = []byte{ + 0x0a, 0x19, 0x70, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x73, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x70, 0x72, 0x6f, + 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x2e, 0x76, 0x31, 0x1a, 0x16, 0x73, 0x63, 0x68, 0x65, 0x6d, + 0x61, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x71, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, + 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x62, 0x6a, + 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, + 0x63, 0x74, 0x12, 0x2a, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, + 0x32, 0x16, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x62, + 0x79, 0x74, 0x65, 0x73, 0x22, 0x41, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x06, + 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, + 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x22, 0x46, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, + 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x75, + 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, + 0x3e, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x42, + 0xbb, 0x01, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, + 0x73, 0x2e, 0x76, 0x31, 0x42, 0x0b, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x50, 0x01, 0x5a, 0x49, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, + 0x74, 0x2d, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x2d, 0x73, 0x64, 0x6b, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x2f, + 0x76, 0x31, 0x3b, 0x70, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x76, 0x31, 0xa2, 0x02, + 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0c, 0x50, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, + 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0c, 0x50, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x5c, + 0x56, 0x31, 0xe2, 0x02, 0x18, 0x50, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x5c, 0x56, + 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0d, + 0x50, 0x72, 0x6f, 0x63, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_conduit_v1_schema_proto_rawDescOnce sync.Once - file_conduit_v1_schema_proto_rawDescData = file_conduit_v1_schema_proto_rawDesc + file_procutils_v1_schema_proto_rawDescOnce sync.Once + file_procutils_v1_schema_proto_rawDescData = file_procutils_v1_schema_proto_rawDesc ) -func file_conduit_v1_schema_proto_rawDescGZIP() []byte { - file_conduit_v1_schema_proto_rawDescOnce.Do(func() { - file_conduit_v1_schema_proto_rawDescData = protoimpl.X.CompressGZIP(file_conduit_v1_schema_proto_rawDescData) +func file_procutils_v1_schema_proto_rawDescGZIP() []byte { + file_procutils_v1_schema_proto_rawDescOnce.Do(func() { + file_procutils_v1_schema_proto_rawDescData = protoimpl.X.CompressGZIP(file_procutils_v1_schema_proto_rawDescData) }) - return file_conduit_v1_schema_proto_rawDescData + return file_procutils_v1_schema_proto_rawDescData } -var file_conduit_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_conduit_v1_schema_proto_goTypes = []interface{}{ - (*CreateSchemaRequest)(nil), // 0: conduit.v1.CreateSchemaRequest - (*CreateSchemaResponse)(nil), // 1: conduit.v1.CreateSchemaResponse - (*GetSchemaRequest)(nil), // 2: conduit.v1.GetSchemaRequest - (*GetSchemaResponse)(nil), // 3: conduit.v1.GetSchemaResponse +var file_procutils_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_procutils_v1_schema_proto_goTypes = []interface{}{ + (*CreateSchemaRequest)(nil), // 0: procutils.v1.CreateSchemaRequest + (*CreateSchemaResponse)(nil), // 1: procutils.v1.CreateSchemaResponse + (*GetSchemaRequest)(nil), // 2: procutils.v1.GetSchemaRequest + (*GetSchemaResponse)(nil), // 3: procutils.v1.GetSchemaResponse (v1.Schema_Type)(0), // 4: schema.v1.Schema.Type (*v1.Schema)(nil), // 5: schema.v1.Schema } -var file_conduit_v1_schema_proto_depIdxs = []int32{ - 4, // 0: conduit.v1.CreateSchemaRequest.type:type_name -> schema.v1.Schema.Type - 5, // 1: conduit.v1.CreateSchemaResponse.schema:type_name -> schema.v1.Schema - 5, // 2: conduit.v1.GetSchemaResponse.schema:type_name -> schema.v1.Schema +var file_procutils_v1_schema_proto_depIdxs = []int32{ + 4, // 0: procutils.v1.CreateSchemaRequest.type:type_name -> schema.v1.Schema.Type + 5, // 1: procutils.v1.CreateSchemaResponse.schema:type_name -> schema.v1.Schema + 5, // 2: procutils.v1.GetSchemaResponse.schema:type_name -> schema.v1.Schema 3, // [3:3] is the sub-list for method output_type 3, // [3:3] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name @@ -306,13 +307,13 @@ var file_conduit_v1_schema_proto_depIdxs = []int32{ 0, // [0:3] is the sub-list for field type_name } -func init() { file_conduit_v1_schema_proto_init() } -func file_conduit_v1_schema_proto_init() { - if File_conduit_v1_schema_proto != nil { +func init() { file_procutils_v1_schema_proto_init() } +func file_procutils_v1_schema_proto_init() { + if File_procutils_v1_schema_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_conduit_v1_schema_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_procutils_v1_schema_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateSchemaRequest); i { case 0: return &v.state @@ -324,7 +325,7 @@ func file_conduit_v1_schema_proto_init() { return nil } } - file_conduit_v1_schema_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_procutils_v1_schema_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateSchemaResponse); i { case 0: return &v.state @@ -336,7 +337,7 @@ func file_conduit_v1_schema_proto_init() { return nil } } - file_conduit_v1_schema_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_procutils_v1_schema_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetSchemaRequest); i { case 0: return &v.state @@ -348,7 +349,7 @@ func file_conduit_v1_schema_proto_init() { return nil } } - file_conduit_v1_schema_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_procutils_v1_schema_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetSchemaResponse); i { case 0: return &v.state @@ -365,18 +366,18 @@ func file_conduit_v1_schema_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_conduit_v1_schema_proto_rawDesc, + RawDescriptor: file_procutils_v1_schema_proto_rawDesc, NumEnums: 0, NumMessages: 4, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_conduit_v1_schema_proto_goTypes, - DependencyIndexes: file_conduit_v1_schema_proto_depIdxs, - MessageInfos: file_conduit_v1_schema_proto_msgTypes, + GoTypes: file_procutils_v1_schema_proto_goTypes, + DependencyIndexes: file_procutils_v1_schema_proto_depIdxs, + MessageInfos: file_procutils_v1_schema_proto_msgTypes, }.Build() - File_conduit_v1_schema_proto = out.File - file_conduit_v1_schema_proto_rawDesc = nil - file_conduit_v1_schema_proto_goTypes = nil - file_conduit_v1_schema_proto_depIdxs = nil + File_procutils_v1_schema_proto = out.File + file_procutils_v1_schema_proto_rawDesc = nil + file_procutils_v1_schema_proto_goTypes = nil + file_procutils_v1_schema_proto_depIdxs = nil } diff --git a/proto/conduit/v1/schema.proto b/proto/procutils/v1/schema.proto similarity index 90% rename from proto/conduit/v1/schema.proto rename to proto/procutils/v1/schema.proto index 69701c0..2d3d1bb 100644 --- a/proto/conduit/v1/schema.proto +++ b/proto/procutils/v1/schema.proto @@ -1,10 +1,10 @@ syntax = "proto3"; -package conduit.v1; +package procutils.v1; import "schema/v1/schema.proto"; -option go_package = "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1"; +option go_package = "github.com/conduitio/conduit-processor-sdk/proto/procutils/v1"; message CreateSchemaRequest { string subject = 1; diff --git a/run.go b/run.go index c014d47..9b1adef 100644 --- a/run.go +++ b/run.go @@ -25,7 +25,7 @@ import ( "github.com/conduitio/conduit-commons/opencdc" configv1 "github.com/conduitio/conduit-commons/proto/config/v1" opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" - "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pprocutils" processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" "github.com/conduitio/conduit-processor-sdk/wasm" "github.com/rs/zerolog" @@ -80,7 +80,7 @@ func Run(p Processor) { cmd.Reset() err := wasm.NextCommand(&cmd) if err != nil { - if errors.Is(err, pconduit.ErrNoMoreCommands) { + if errors.Is(err, pprocutils.ErrNoMoreCommands) { os.Exit(0) } _, _ = fmt.Fprintf(os.Stderr, "failed retrieving next command: %v", err) @@ -140,7 +140,7 @@ func (e commandExecutor) Execute(ctx context.Context, p Processor, cmdReq *proce case *processorv1.CommandRequest_Teardown: resp, err = e.executeTeardown(ctx, p, req.Teardown) default: - err = pconduit.ErrUnknownCommandRequest + err = pprocutils.ErrUnknownCommandRequest } if err != nil { @@ -314,7 +314,7 @@ func (c protoConverter) errorRecord(in ErrorRecord) (*processorv1.Process_Proces } func (c protoConverter) error(err error) *processorv1.Error { - var wasmErr *pconduit.Error + var wasmErr *pprocutils.Error var code uint32 if errors.As(err, &wasmErr) { code = wasmErr.ErrCode diff --git a/schema/in_memory.go b/schema/in_memory.go index cc55088..b6a2b57 100644 --- a/schema/in_memory.go +++ b/schema/in_memory.go @@ -20,7 +20,7 @@ import ( "sync" "github.com/conduitio/conduit-commons/schema" - "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pprocutils" ) type InMemoryService struct { @@ -31,15 +31,15 @@ type InMemoryService struct { m sync.Mutex } -func NewInMemoryService() pconduit.SchemaService { +func NewInMemoryService() pprocutils.SchemaService { return &InMemoryService{ schemas: make(map[string][]schema.Schema), } } -func (s *InMemoryService) CreateSchema(_ context.Context, request pconduit.CreateSchemaRequest) (pconduit.CreateSchemaResponse, error) { +func (s *InMemoryService) CreateSchema(_ context.Context, request pprocutils.CreateSchemaRequest) (pprocutils.CreateSchemaResponse, error) { if request.Type != schema.TypeAvro { - return pconduit.CreateSchemaResponse{}, pconduit.ErrInvalidSchema + return pprocutils.CreateSchemaResponse{}, pprocutils.ErrInvalidSchema } s.m.Lock() @@ -53,21 +53,21 @@ func (s *InMemoryService) CreateSchema(_ context.Context, request pconduit.Creat } s.schemas[request.Subject] = append(s.schemas[request.Subject], inst) - return pconduit.CreateSchemaResponse{Schema: inst}, nil + return pprocutils.CreateSchemaResponse{Schema: inst}, nil } -func (s *InMemoryService) GetSchema(_ context.Context, request pconduit.GetSchemaRequest) (pconduit.GetSchemaResponse, error) { +func (s *InMemoryService) GetSchema(_ context.Context, request pprocutils.GetSchemaRequest) (pprocutils.GetSchemaResponse, error) { s.m.Lock() defer s.m.Unlock() versions, ok := s.schemas[request.Subject] if !ok { - return pconduit.GetSchemaResponse{}, fmt.Errorf("subject %v: %w", request.Subject, pconduit.ErrSubjectNotFound) + return pprocutils.GetSchemaResponse{}, fmt.Errorf("subject %v: %w", request.Subject, pprocutils.ErrSubjectNotFound) } if len(versions) < request.Version { - return pconduit.GetSchemaResponse{}, fmt.Errorf("version %v: %w", request.Version, pconduit.ErrVersionNotFound) + return pprocutils.GetSchemaResponse{}, fmt.Errorf("version %v: %w", request.Version, pprocutils.ErrVersionNotFound) } - return pconduit.GetSchemaResponse{Schema: versions[request.Version-1]}, nil + return pprocutils.GetSchemaResponse{Schema: versions[request.Version-1]}, nil } diff --git a/schema/schema.go b/schema/schema.go index e07204f..01d1b3a 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -19,7 +19,7 @@ import ( "fmt" "github.com/conduitio/conduit-commons/schema" - "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pprocutils" ) const TypeAvro = schema.TypeAvro @@ -27,13 +27,13 @@ const TypeAvro = schema.TypeAvro var SchemaService = NewInMemoryService() var ( - ErrSubjectNotFound = pconduit.ErrSubjectNotFound - ErrVersionNotFound = pconduit.ErrVersionNotFound - ErrInvalidSchema = pconduit.ErrInvalidSchema + ErrSubjectNotFound = pprocutils.ErrSubjectNotFound + ErrVersionNotFound = pprocutils.ErrVersionNotFound + ErrInvalidSchema = pprocutils.ErrInvalidSchema ) func Get(ctx context.Context, subject string, version int) (schema.Schema, error) { - resp, err := SchemaService.GetSchema(ctx, pconduit.GetSchemaRequest{ + resp, err := SchemaService.GetSchema(ctx, pprocutils.GetSchemaRequest{ Subject: subject, Version: version, }) @@ -44,7 +44,7 @@ func Get(ctx context.Context, subject string, version int) (schema.Schema, error } func Create(ctx context.Context, typ schema.Type, subject string, bytes []byte) (schema.Schema, error) { - resp, err := SchemaService.CreateSchema(ctx, pconduit.CreateSchemaRequest{ + resp, err := SchemaService.CreateSchema(ctx, pprocutils.CreateSchemaRequest{ Subject: subject, Type: typ, Bytes: bytes, diff --git a/util.go b/util.go index 2a6cac2..a508935 100644 --- a/util.go +++ b/util.go @@ -21,7 +21,7 @@ import ( "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-processor-sdk/internal/reference" - "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pprocutils" "github.com/rs/zerolog" ) @@ -29,7 +29,7 @@ import ( // is passed to any of the processor's methods (Configure, Open, Process, // Teardown) to ensure that the log messages include contextual information. func Logger(ctx context.Context) *zerolog.Logger { - l := pconduit.Logger.With().Ctx(ctx).Logger() + l := pprocutils.Logger.With().Ctx(ctx).Logger() return &l } diff --git a/wasm/caller.go b/wasm/caller.go index f553c57..1ffcacf 100644 --- a/wasm/caller.go +++ b/wasm/caller.go @@ -19,7 +19,7 @@ package wasm import ( "unsafe" - "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pprocutils" ) // HostFunc is the function type for the imported functions from the host. @@ -44,8 +44,8 @@ func hostCall(fn HostFunc, buf []byte) ([]byte, uint32, error) { ptr := unsafe.Pointer(&buf[0]) cmdSize := fn(ptr, uint32(len(buf))) switch { - case cmdSize >= pconduit.ErrorCodeStart: // error codes - return nil, cmdSize, pconduit.NewErrorFromCode(cmdSize) + case cmdSize >= pprocutils.ErrorCodeStart: // error codes + return nil, cmdSize, pprocutils.NewErrorFromCode(cmdSize) case cmdSize > uint32(len(buf)) && i == 0: // not enough memory oldSize := uint32(len(buf)) buf = append(buf, make([]byte, cmdSize-oldSize)...) diff --git a/wasm/schema.go b/wasm/schema.go index 987ed72..28ac4ee 100644 --- a/wasm/schema.go +++ b/wasm/schema.go @@ -20,16 +20,16 @@ import ( "context" "fmt" - "github.com/conduitio/conduit-processor-sdk/pconduit" - "github.com/conduitio/conduit-processor-sdk/pconduit/v1/fromproto" - "github.com/conduitio/conduit-processor-sdk/pconduit/v1/toproto" - conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1" + "github.com/conduitio/conduit-processor-sdk/pprocutils" + "github.com/conduitio/conduit-processor-sdk/pprocutils/v1/fromproto" + "github.com/conduitio/conduit-processor-sdk/pprocutils/v1/toproto" + procutilsv1 "github.com/conduitio/conduit-processor-sdk/proto/procutils/v1" "google.golang.org/protobuf/proto" ) type schemaService struct{} -func (*schemaService) CreateSchema(_ context.Context, req pconduit.CreateSchemaRequest) (pconduit.CreateSchemaResponse, error) { +func (*schemaService) CreateSchema(_ context.Context, req pprocutils.CreateSchemaRequest) (pprocutils.CreateSchemaResponse, error) { protoReq := toproto.CreateSchemaRequest(req) buffer := bufferPool.Get().([]byte) @@ -37,25 +37,24 @@ func (*schemaService) CreateSchema(_ context.Context, req pconduit.CreateSchemaR buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], protoReq) if err != nil { - return pconduit.CreateSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) + return pprocutils.CreateSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) } buffer, cmdSize, err := hostCall(_createSchema, buffer) if err != nil { - return pconduit.CreateSchemaResponse{}, fmt.Errorf("error calling createSchema: %w", err) + return pprocutils.CreateSchemaResponse{}, fmt.Errorf("error calling createSchema: %w", err) } - var resp conduitv1.CreateSchemaResponse + var resp procutilsv1.CreateSchemaResponse err = proto.Unmarshal(buffer[:cmdSize], &resp) if err != nil { - return pconduit.CreateSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) + return pprocutils.CreateSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) } return fromproto.CreateSchemaResponse(&resp), nil - } -func (*schemaService) GetSchema(_ context.Context, req pconduit.GetSchemaRequest) (pconduit.GetSchemaResponse, error) { +func (*schemaService) GetSchema(_ context.Context, req pprocutils.GetSchemaRequest) (pprocutils.GetSchemaResponse, error) { protoReq := toproto.GetSchemaRequest(req) buffer := bufferPool.Get().([]byte) @@ -63,18 +62,18 @@ func (*schemaService) GetSchema(_ context.Context, req pconduit.GetSchemaRequest buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], protoReq) if err != nil { - return pconduit.GetSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) + return pprocutils.GetSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) } buffer, cmdSize, err := hostCall(_getSchema, buffer) if err != nil { - return pconduit.GetSchemaResponse{}, fmt.Errorf("error calling getSchema: %w", err) + return pprocutils.GetSchemaResponse{}, fmt.Errorf("error calling getSchema: %w", err) } - var resp conduitv1.GetSchemaResponse + var resp procutilsv1.GetSchemaResponse err = proto.Unmarshal(buffer[:cmdSize], &resp) if err != nil { - return pconduit.GetSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) + return pprocutils.GetSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) } return fromproto.GetSchemaResponse(&resp), nil } diff --git a/wasm/util.go b/wasm/util.go index 1fb973f..f90f22b 100644 --- a/wasm/util.go +++ b/wasm/util.go @@ -19,7 +19,7 @@ package wasm import ( "os" - "github.com/conduitio/conduit-processor-sdk/pconduit" + "github.com/conduitio/conduit-processor-sdk/pprocutils" "github.com/conduitio/conduit-processor-sdk/schema" "github.com/rs/zerolog" ) @@ -39,7 +39,7 @@ func initLogger(logLevel string) { level = zerolog.DebugLevel } logger = logger.Level(level) - pconduit.Logger = logger + pprocutils.Logger = logger } func initSchemaService() {