diff --git a/wasm/caller.go b/wasm/caller.go new file mode 100644 index 0000000..5f735ee --- /dev/null +++ b/wasm/caller.go @@ -0,0 +1,59 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build wasm + +package wasm + +import ( + "unsafe" +) + +// HostFunc is the function type for the imported functions from the host. +// +// The arguments are: +// (1) a pointer to the address where the command should be written +// (2) the size of allocated memory. +// +// The return value indicates the size of the allocated response in bytes. If the +// response is larger than the allocated memory, the caller should reallocate the +// memory and call the function again. +type HostFunc func(ptr unsafe.Pointer, size uint32) uint32 + +type HostCaller struct { + Func HostFunc +} + +// Call calls the function from the host 2 times max, is the buffer size is not +// enough the first time its called, it will be resized the second call. +// returns the buffer, the command size, and the error. +func (fc *HostCaller) Call(buf []byte, size int) ([]byte, uint32, error) { + // 2 tries, 1st try is with the current buffer size, if that's not enough, + // then resize the buffer and try again + for i := 0; i < 2; i++ { + // request the host to write the response to the given buffer address + ptr := unsafe.Pointer(&buf[0]) + cmdSize := fc.Func(ptr, uint32(size)) + switch { + case cmdSize >= ErrorCodeStart: // error codes + return nil, cmdSize, NewErrorFromCode(cmdSize) + case cmdSize > uint32(cap(buf)) && i == 0: // not enough memory + oldSize := uint32(len(buf)) + buf = append(buf, make([]byte, cmdSize-oldSize)...) + continue // try again + } + return buf, cmdSize, nil + } + panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") +} diff --git a/wasm/command.go b/wasm/command.go index b40d0f3..af75bf8 100644 --- a/wasm/command.go +++ b/wasm/command.go @@ -18,8 +18,6 @@ package wasm import ( "fmt" - "unsafe" - processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" "google.golang.org/protobuf/proto" ) @@ -28,30 +26,17 @@ const defaultCommandSize = 1024 // 1kB var buffer = make([]byte, defaultCommandSize) -// NextCommand retrieves the next command from Conduit. func NextCommand(cmdReq *processorv1.CommandRequest) error { - // 2 tries, 1st try is with the current buffer size, if that's not enough, - // then resize the buffer and try again - for i := 0; i < 2; i++ { - // request Conduit to write the command to the given buffer - ptr := unsafe.Pointer(&buffer[0]) - cmdSize := _commandRequest(ptr, uint32(cap(buffer))) - - switch { - case cmdSize >= ErrorCodeStart: // error codes - return NewErrorFromCode(cmdSize) - case cmdSize > uint32(cap(buffer)): // not enough memory - buffer = make([]byte, cmdSize) // resize buffer - continue // try again - } - - // parse the command - if err := proto.Unmarshal(buffer[:cmdSize], cmdReq); err != nil { - return fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) - } - return nil + commandRequestCaller := HostCaller{Func: _commandRequest} + buf, cmdSize, err := commandRequestCaller.Call(buffer, cap(buffer)) + if err != nil { + return err + } + // parse the command + if err := proto.Unmarshal(buf[:cmdSize], cmdReq); err != nil { + return fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) } - panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") + return nil } func Reply(resp *processorv1.CommandResponse) error { @@ -60,12 +45,7 @@ func Reply(resp *processorv1.CommandResponse) error { if err != nil { return fmt.Errorf("failed marshalling proto type into bytes: %w", err) } - - ptr := unsafe.Pointer(&buffer[0]) - errCode := _commandResponse(ptr, uint32(len(buffer))) - if errCode != 0 { - return NewErrorFromCode(errCode) - } - - return nil + commandResponseCaller := HostCaller{Func: _commandResponse} + _, _, err = commandResponseCaller.Call(buffer, len(buffer)) + return err } diff --git a/wasm/schema.go b/wasm/schema.go index a1711d3..4028b27 100644 --- a/wasm/schema.go +++ b/wasm/schema.go @@ -18,7 +18,6 @@ package wasm import ( "fmt" - "unsafe" conduitv1 "github.com/conduitio/conduit-processor-sdk/proto/conduit/v1" "google.golang.org/protobuf/proto" @@ -29,28 +28,20 @@ func CreateSchema(req *conduitv1.CreateSchemaRequest) (*conduitv1.CreateSchemaRe if err != nil { return nil, fmt.Errorf("error marshalling request: %w", err) } - // 2 tries, 1st try is with the current buffer size, if that's not enough, - // then resize the buffer and try again - for i := 0; i < 2; i++ { - // request Conduit to write the command to the given buffer - ptr := unsafe.Pointer(&buf[0]) - cmdSize := _createSchema(ptr, uint32(cap(buf))) - switch { - case cmdSize >= ErrorCodeStart: // error codes - return nil, NewErrorFromCode(cmdSize) - case cmdSize > uint32(cap(buf)) || i == 0: // not enough memory - oldSize := uint32(len(buf)) - buf = append(buf, make([]byte, cmdSize-oldSize)...) - continue // try again - } - var resp conduitv1.CreateSchemaResponse - err = proto.Unmarshal(buf[:cmdSize], &resp) - if err != nil { - return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) - } - return &resp, nil + + createSchemaCaller := HostCaller{Func: _createSchema} + buf, cmdSize, err := createSchemaCaller.Call(buf, cap(buf)) + if err != nil { + return nil, err + } + + var resp conduitv1.CreateSchemaResponse + err = proto.Unmarshal(buf[:cmdSize], &resp) + if err != nil { + return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) } - panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") + return &resp, nil + } func GetSchema(req *conduitv1.GetSchemaRequest) (*conduitv1.GetSchemaResponse, error) { @@ -58,26 +49,17 @@ func GetSchema(req *conduitv1.GetSchemaRequest) (*conduitv1.GetSchemaResponse, e if err != nil { return nil, fmt.Errorf("error marshalling request: %w", err) } - // 2 tries, 1st try is with the current buffer size, if that's not enough, - // then resize the buffer and try again - for i := 0; i < 2; i++ { - // request Conduit to write the command to the given buffer - ptr := unsafe.Pointer(&buf[0]) - cmdSize := _getSchema(ptr, uint32(cap(buf))) - switch { - case cmdSize >= ErrorCodeStart: // error codes - return nil, NewErrorFromCode(cmdSize) - case cmdSize > uint32(cap(buf)) || i == 0: // not enough memory - oldSize := uint32(len(buf)) - buf = append(buf, make([]byte, cmdSize-oldSize)...) - continue // try again - } - var resp conduitv1.GetSchemaResponse - err = proto.Unmarshal(buf[:cmdSize], &resp) - if err != nil { - return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) - } - return &resp, nil + + getSchemaCaller := HostCaller{Func: _getSchema} + buf, cmdSize, err := getSchemaCaller.Call(buf, cap(buf)) + if err != nil { + return nil, err + } + + var resp conduitv1.GetSchemaResponse + err = proto.Unmarshal(buf[:cmdSize], &resp) + if err != nil { + return nil, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) } - panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") + return &resp, nil }