Skip to content

Commit

Permalink
todos+fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
maha-hajja committed Jul 16, 2024
1 parent 0e9997e commit 5316826
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 32 deletions.
1 change: 0 additions & 1 deletion conduit/global/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"os"

"github.com/conduitio/conduit-processor-sdk/conduit"

"github.com/rs/zerolog"
)

Expand Down
30 changes: 13 additions & 17 deletions conduit/v1/toproto/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,38 @@ import (
)

func GetSchemaRequest(in conduit.GetSchemaRequest) *conduitv1.GetSchemaRequest {
// TODO implement
return &conduitv1.GetSchemaRequest{
Subject: subject,
Version: int32(version),
Subject: in.Subject,
Version: int32(in.Version),
}
}

func GetSchemaResponse(in conduit.GetSchemaResponse) *conduitv1.GetSchemaResponse {
// TODO implement
return &conduitv1.GetSchemaResponse{
Schema: &schemav1.Schema{
Subject: inst.Subject,
Version: int32(inst.Version),
Type: schemav1.Schema_Type(inst.Type),
Bytes: inst.Bytes,
Subject: in.Schema.Subject,
Version: int32(in.Schema.Version),
Type: schemav1.Schema_Type(in.Schema.Type),
Bytes: in.Schema.Bytes,
},
}
}

func CreateSchemaRequest(in conduit.CreateSchemaRequest) *conduitv1.CreateSchemaRequest {
// TODO implement
return &conduitv1.CreateSchemaRequest{
Subject: subject,
Type: schemav1.Schema_Type(typ),
Bytes: bytes,
Subject: in.Subject,
Type: schemav1.Schema_Type(in.Type),
Bytes: in.Bytes,
}
}

func CreateSchemaResponse(in conduit.CreateSchemaResponse) *conduitv1.CreateSchemaResponse {
// TODO implement
return &conduitv1.CreateSchemaResponse{
Schema: &schemav1.Schema{
Subject: inst.Subject,
Version: int32(inst.Version),
Type: schemav1.Schema_Type(inst.Type),
Bytes: inst.Bytes,
Subject: in.Schema.Subject,
Version: int32(in.Schema.Version),
Type: schemav1.Schema_Type(in.Schema.Type),
Bytes: in.Schema.Bytes,
},
}
}
27 changes: 13 additions & 14 deletions wasm/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/conduitio/conduit-processor-sdk/conduit"
"github.com/conduitio/conduit-processor-sdk/conduit/v1/fromproto"
"github.com/conduitio/conduit-processor-sdk/conduit/v1/toproto"

conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1"
"google.golang.org/protobuf/proto"
)
Expand All @@ -42,39 +41,39 @@ func (*schemaService) CreateSchema(_ context.Context, req conduit.CreateSchemaRe
}

buffer, cmdSize, err := hostCall(_createSchema, buffer)
if cmdSize >= ErrorCodeStart {
return conduit.CreateSchemaResponse{}, NewErrorFromCode(cmdSize)
if err != nil {
return conduit.CreateSchemaResponse{}, err
}

var resp *conduitv1.CreateSchemaResponse
err = proto.Unmarshal(buffer[:cmdSize], resp)
var resp conduitv1.CreateSchemaResponse
err = proto.Unmarshal(buffer[:cmdSize], &resp)
if err != nil {
return conduit.CreateSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}

return fromproto.CreateSchemaResponse(resp), nil
return fromproto.CreateSchemaResponse(&resp), nil

}

// TODO update the same as CreateSchema
func (*schemaService) GetSchema(_ context.Context, req conduit.GetSchemaRequest) (conduit.GetSchemaResponse, error) {
protoReq := toproto.GetSchemaRequest(req)

buffer := bufferPool.Get().([]byte)
defer bufferPool.Put(buffer)

buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], req)
buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], protoReq)
if err != nil {
return nil, fmt.Errorf("error marshalling request: %w", err)
return conduit.GetSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err)
}

buffer, cmdSize, err := hostCall(_getSchema, buffer)
if cmdSize >= ErrorCodeStart {
return nil, NewErrorFromCode(cmdSize)
if err != nil {
return conduit.GetSchemaResponse{}, err
}

var resp conduitv1.GetSchemaResponse
err = proto.Unmarshal(buffer[:cmdSize], &resp)
if err != nil {
return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
return conduit.GetSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}
return &resp, nil
return fromproto.GetSchemaResponse(&resp), nil
}

0 comments on commit 5316826

Please sign in to comment.