Skip to content

Commit

Permalink
create HostCaller utility to call conduit functions
Browse files Browse the repository at this point in the history
  • Loading branch information
maha-hajja committed Jul 11, 2024
1 parent 58bb54e commit 05fa27a
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 75 deletions.
59 changes: 59 additions & 0 deletions wasm/caller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build wasm

package wasm

import (
"unsafe"
)

// HostFunc is the function type for the imported functions from the host.
//
// The arguments are:
// (1) a pointer to the address where the command should be written
// (2) the size of allocated memory.
//
// The return value indicates the size of the allocated response in bytes. If the
// response is larger than the allocated memory, the caller should reallocate the
// memory and call the function again.
type HostFunc func(ptr unsafe.Pointer, size uint32) uint32

type HostCaller struct {
Func HostFunc
}

// Call calls the function from the host 2 times max, is the buffer size is not
// enough the first time its called, it will be resized the second call.
// returns the buffer, the command size, and the error.
func (fc *HostCaller) Call(buf []byte, size int) ([]byte, uint32, error) {
// 2 tries, 1st try is with the current buffer size, if that's not enough,
// then resize the buffer and try again
for i := 0; i < 2; i++ {
// request the host to write the response to the given buffer address
ptr := unsafe.Pointer(&buf[0])
cmdSize := fc.Func(ptr, uint32(size))
switch {
case cmdSize >= ErrorCodeStart: // error codes
return nil, cmdSize, NewErrorFromCode(cmdSize)
case cmdSize > uint32(cap(buf)) && i == 0: // not enough memory
oldSize := uint32(len(buf))
buf = append(buf, make([]byte, cmdSize-oldSize)...)
continue // try again
}
return buf, cmdSize, nil
}
panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop")
}
44 changes: 12 additions & 32 deletions wasm/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package wasm

import (
"fmt"
"unsafe"

processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1"
"google.golang.org/protobuf/proto"
)
Expand All @@ -28,30 +26,17 @@ const defaultCommandSize = 1024 // 1kB

var buffer = make([]byte, defaultCommandSize)

// NextCommand retrieves the next command from Conduit.
func NextCommand(cmdReq *processorv1.CommandRequest) error {
// 2 tries, 1st try is with the current buffer size, if that's not enough,
// then resize the buffer and try again
for i := 0; i < 2; i++ {
// request Conduit to write the command to the given buffer
ptr := unsafe.Pointer(&buffer[0])
cmdSize := _commandRequest(ptr, uint32(cap(buffer)))

switch {
case cmdSize >= ErrorCodeStart: // error codes
return NewErrorFromCode(cmdSize)
case cmdSize > uint32(cap(buffer)): // not enough memory
buffer = make([]byte, cmdSize) // resize buffer
continue // try again
}

// parse the command
if err := proto.Unmarshal(buffer[:cmdSize], cmdReq); err != nil {
return fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}
return nil
commandRequestCaller := HostCaller{Func: _commandRequest}
buf, cmdSize, err := commandRequestCaller.Call(buffer, cap(buffer))
if err != nil {
return err
}
// parse the command
if err := proto.Unmarshal(buf[:cmdSize], cmdReq); err != nil {
return fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}
panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop")
return nil
}

func Reply(resp *processorv1.CommandResponse) error {
Expand All @@ -60,12 +45,7 @@ func Reply(resp *processorv1.CommandResponse) error {
if err != nil {
return fmt.Errorf("failed marshalling proto type into bytes: %w", err)
}

ptr := unsafe.Pointer(&buffer[0])
errCode := _commandResponse(ptr, uint32(len(buffer)))
if errCode != 0 {
return NewErrorFromCode(errCode)
}

return nil
commandResponseCaller := HostCaller{Func: _commandResponse}
_, _, err = commandResponseCaller.Call(buffer, len(buffer))
return err
}
68 changes: 25 additions & 43 deletions wasm/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package wasm

import (
"fmt"
"unsafe"

conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1"
"google.golang.org/protobuf/proto"
Expand All @@ -29,55 +28,38 @@ func CreateSchema(req *conduitv1.CreateSchemaRequest) (*conduitv1.CreateSchemaRe
if err != nil {
return nil, fmt.Errorf("error marshalling request: %w", err)
}
// 2 tries, 1st try is with the current buffer size, if that's not enough,
// then resize the buffer and try again
for i := 0; i < 2; i++ {
// request Conduit to write the command to the given buffer
ptr := unsafe.Pointer(&buf[0])
cmdSize := _createSchema(ptr, uint32(cap(buf)))
switch {
case cmdSize >= ErrorCodeStart: // error codes
return nil, NewErrorFromCode(cmdSize)
case cmdSize > uint32(cap(buf)) || i == 0: // not enough memory
oldSize := uint32(len(buf))
buf = append(buf, make([]byte, cmdSize-oldSize)...)
continue // try again
}
var resp conduitv1.CreateSchemaResponse
err = proto.Unmarshal(buf[:cmdSize], &resp)
if err != nil {
return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}
return &resp, nil

createSchemaCaller := HostCaller{Func: _createSchema}
buf, cmdSize, err := createSchemaCaller.Call(buf, cap(buf))
if err != nil {
return nil, err
}

var resp conduitv1.CreateSchemaResponse
err = proto.Unmarshal(buf[:cmdSize], &resp)
if err != nil {
return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}
panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop")
return &resp, nil

}

func GetSchema(req *conduitv1.GetSchemaRequest) (*conduitv1.GetSchemaResponse, error) {
buf, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("error marshalling request: %w", err)
}
// 2 tries, 1st try is with the current buffer size, if that's not enough,
// then resize the buffer and try again
for i := 0; i < 2; i++ {
// request Conduit to write the command to the given buffer
ptr := unsafe.Pointer(&buf[0])
cmdSize := _getSchema(ptr, uint32(cap(buf)))
switch {
case cmdSize >= ErrorCodeStart: // error codes
return nil, NewErrorFromCode(cmdSize)
case cmdSize > uint32(cap(buf)) || i == 0: // not enough memory
oldSize := uint32(len(buf))
buf = append(buf, make([]byte, cmdSize-oldSize)...)
continue // try again
}
var resp conduitv1.GetSchemaResponse
err = proto.Unmarshal(buf[:cmdSize], &resp)
if err != nil {
return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}
return &resp, nil

getSchemaCaller := HostCaller{Func: _getSchema}
buf, cmdSize, err := getSchemaCaller.Call(buf, cap(buf))
if err != nil {
return nil, err
}

var resp conduitv1.GetSchemaResponse
err = proto.Unmarshal(buf[:cmdSize], &resp)
if err != nil {
return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err)
}
panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop")
return &resp, nil
}

0 comments on commit 05fa27a

Please sign in to comment.