diff --git a/README.md b/README.md index a1732f8..8cd8bb4 100644 --- a/README.md +++ b/README.md @@ -5,4 +5,191 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/conduitio/conduit-processor-sdk)](https://goreportcard.com/report/github.com/conduitio/conduit-processor-sdk) [![Go Reference](https://pkg.go.dev/badge/github.com/conduitio/conduit-processor-sdk.svg)](https://pkg.go.dev/github.com/conduitio/conduit-processor-sdk) -:construction: **This repository is under construction, check back in the end of February** :construction: +This repository contains the Go software development kit for implementing a +processor for [Conduit](https://github.com/conduitio/conduit). + +## Get started + +Create a new folder and initialize a fresh go module: + +```sh +go mod init example.com/conduit-processor-demo +``` + +Add the processor SDK dependency: + +```sh +go get github.com/conduitio/conduit-processor-sdk +``` + +You can now create a new processor by implementing the +[`Processor`](https://pkg.go.dev/github.com/conduitio/conduit-processor-sdk#Processor) +interface. Here is an example of a simple processor that adds a field to the +record: + +```go +package example + +import ( + "context" + + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" +) + +type AddFieldProcessor struct { + sdk.UnimplementedProcessor + Field string + Value string +} + +// Specification returns metadata about the processor. +func (p *AddFieldProcessor) Specification(context.Context) (sdk.Specification, error) { + return sdk.Specification{ + Name: "myAddFieldProcessor", + Summary: "Add a field to the record.", + Description: `This processor lets you configure a static field that will +be added to the record into field .Payload.After. If the payload is not +structured data, this processor will panic.`, + Version: "v1.0.0", + Author: "John Doe", + Parameters: map[string]config.Parameter{ + "field": {Type: config.ParameterTypeString, Description: "The name of the field to add"}, + "name": {Type: config.ParameterTypeString, Description: "The value of the field to add"}, + }, + }, nil +} + +// Configure is called by Conduit to configure the processor. It receives a map +// with the parameters defined in the Specification method. +func (p *AddFieldProcessor) Configure(ctx context.Context, config map[string]string) error { + p.Field = config["field"] + p.Value = config["value"] + return nil +} + +// Process is called by Conduit to process records. It receives a slice of +// records and should return a slice of processed records. +func (p *AddFieldProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + out := make([]sdk.ProcessedRecord, 0, len(records)) + for _, record := range records { + record.Payload.After.(opencdc.StructuredData)[p.Field] = p.Value + out = append(out, sdk.SingleRecord(record)) + } + return out +} +``` + +You also need to add an entrypoint to your processor, since it will be run as a +standalone WASM plugin: + +```go +//go:build wasm + +package main + +import ( + example "example.com/add-field-processor" + sdk "github.com/conduitio/conduit-processor-sdk" +) + +func main() { + sdk.Run(&example.AddFieldProcessor{}) +} +``` + +If the processor is very simple and can be reduced to a single function (e.g. +no configuration needed), then we can use `sdk.NewProcessorFunc()`, as below: + +```go +//go:build wasm + +package main + +import ( + sdk "github.com/conduitio/conduit-processor-sdk" +) + +func main() { + sdk.Run(&sdk.NewProcessorFunc( + sdk.Specification{Name: "simple-processor"}), + func(ctx context.Context, rec opencdc.Record) (opencdc.Record, error) { + // do something with the record + return rec + }, + ) +} +``` + +With this you are set to build your processor. Note that we are building the +processor as a WebAssembly module, so you need to set `GOARCH` and `GOOS`: + +```sh +GOARCH=wasm GOOS=wasip1 go build -o add-field-processor.wasm cmd/main.go +``` + +The produced `add-field-processor.wasm` file can be used as a processor in +Conduit. Copy it to the `processors` directory of your Conduit instance and +configure it in the `processors` section of the pipeline configuration file. + +## FAQ + +### Why do I need to specify `GOARCH` and `GOOS`? + +Conduit uses [WebAssembly](https://webassembly.org) to run standalone processors. +This means that you need to build your processor as a WebAssembly module. You can +do this by setting the environment variables `GOARCH=wasm` and `GOOS=wasip1` when +running `go build`. This will produce a WebAssembly module that can be used as a +processor in Conduit. + +### How do I use a processor? + +To use a standalone WASM processor in Conduit, the following two steps need to be +done: + +1. Copying the WebAssembly module to the processors directory of your Conduit + instance. By default, that's a directory called `processors` that is in the same + directory as Conduit. The directory can be changed with the `processors.path` flag. + + An example directory structure would be: + ```shell + tree . + . + ├── conduit + └── processors + └── add-field-processor.wasm + ``` +2. Use the processor in the `processors` section of the pipeline configuration file. + using the name the processor defines in its specifications. An example configuration + for the processor above would be: + ```yaml + processors: + - id: add-foo-field + plugin: myAddFieldProcessor + settings: + field: 'foo' + value: 'bar' + ``` + +### How do I log from a processor? + +You can get a `zerolog.logger` instance from the context using the +[`sdk.Logger`](https://pkg.go.dev/github.com/conduitio/conduit-processor-sdk#Logger) +function. This logger is pre-configured to append logs in the format expected by +Conduit. + +Keep in mind that logging in the hot path (i.e. in the `Process` method) can have +a significant impact on the performance of your processor, therefore we recommend +to use the `Trace` level for logs that are not essential for the operation of the +processor. + +Example: + +```go +func (p *AddFieldProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { +logger := sdk.Logger(ctx) +logger.Trace().Msg("Processing records") +// ... +} +``` \ No newline at end of file diff --git a/processor_func.go b/processor_func.go index f268192..b414413 100644 --- a/processor_func.go +++ b/processor_func.go @@ -41,11 +41,7 @@ func NewProcessorFunc(specs Specification, f func(context.Context, opencdc.Recor } } -func (f ProcessorFunc) Specification() (Specification, error) { return f.specs, nil } -func (ProcessorFunc) Configure(context.Context, map[string]string) error { return nil } -func (ProcessorFunc) Open(context.Context) error { - return nil -} +func (f ProcessorFunc) Specification() (Specification, error) { return f.specs, nil } func (f ProcessorFunc) Process(ctx context.Context, records []opencdc.Record) []ProcessedRecord { outRecs := make([]ProcessedRecord, len(records)) @@ -63,7 +59,3 @@ func (f ProcessorFunc) Process(ctx context.Context, records []opencdc.Record) [] } return outRecs } - -func (ProcessorFunc) Teardown(context.Context) error { - return nil -} diff --git a/unimplemented.go b/unimplemented.go index f134e12..7cc9475 100644 --- a/unimplemented.go +++ b/unimplemented.go @@ -29,24 +29,24 @@ func (UnimplementedProcessor) Specification() (Specification, error) { return Specification{}, fmt.Errorf("action \"Specification\": %w", ErrUnimplemented) } -// Configure needs to be overridden in the actual implementation. +// Configure is optional and can be overridden in the actual implementation. func (UnimplementedProcessor) Configure(context.Context, map[string]string) error { - return fmt.Errorf("action \"Configure\": %w", ErrUnimplemented) + return nil } -// Open needs to be overridden in the actual implementation. +// Open is optional and can be overridden in the actual implementation. func (UnimplementedProcessor) Open(context.Context) error { - return fmt.Errorf("action \"Open\": %w", ErrUnimplemented) + return nil } // Process needs to be overridden in the actual implementation. func (UnimplementedProcessor) Process(context.Context, []opencdc.Record) []ProcessedRecord { - return nil + return []ProcessedRecord{ErrorRecord{Error: ErrUnimplemented}} } -// Teardown needs to be overridden in the actual implementation. +// Teardown is optional and can be overridden in the actual implementation. func (UnimplementedProcessor) Teardown(context.Context) error { - return fmt.Errorf("action \"Teardown\": %w", ErrUnimplemented) + return nil } func (UnimplementedProcessor) mustEmbedUnimplementedProcessor() {}