Skip to content

Commit

Permalink
Readme + Processor simplification (#24)
Browse files Browse the repository at this point in the history
* update readme

* allow processors to not implement Open, Configure and Teardown

* Update README.md

Co-authored-by: Maha Hajja <[email protected]>

* pr feedback

* add example for sdk.NewProcessorFunc

---------

Co-authored-by: Haris Osmanagić <[email protected]>
Co-authored-by: Maha Hajja <[email protected]>
  • Loading branch information
3 people authored Mar 7, 2024
1 parent cf2a761 commit 02b60dc
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 17 deletions.
189 changes: 188 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,191 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/conduitio/conduit-processor-sdk)](https://goreportcard.com/report/github.com/conduitio/conduit-processor-sdk)
[![Go Reference](https://pkg.go.dev/badge/github.com/conduitio/conduit-processor-sdk.svg)](https://pkg.go.dev/github.com/conduitio/conduit-processor-sdk)

:construction: **This repository is under construction, check back in the end of February** :construction:
This repository contains the Go software development kit for implementing a
processor for [Conduit](https://github.com/conduitio/conduit).

## Get started

Create a new folder and initialize a fresh go module:

```sh
go mod init example.com/conduit-processor-demo
```

Add the processor SDK dependency:

```sh
go get github.com/conduitio/conduit-processor-sdk
```

You can now create a new processor by implementing the
[`Processor`](https://pkg.go.dev/github.com/conduitio/conduit-processor-sdk#Processor)
interface. Here is an example of a simple processor that adds a field to the
record:

```go
package example

import (
"context"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
)

type AddFieldProcessor struct {
sdk.UnimplementedProcessor
Field string
Value string
}

// Specification returns metadata about the processor.
func (p *AddFieldProcessor) Specification(context.Context) (sdk.Specification, error) {
return sdk.Specification{
Name: "myAddFieldProcessor",
Summary: "Add a field to the record.",
Description: `This processor lets you configure a static field that will
be added to the record into field .Payload.After. If the payload is not
structured data, this processor will panic.`,
Version: "v1.0.0",
Author: "John Doe",
Parameters: map[string]config.Parameter{
"field": {Type: config.ParameterTypeString, Description: "The name of the field to add"},
"name": {Type: config.ParameterTypeString, Description: "The value of the field to add"},
},
}, nil
}

// Configure is called by Conduit to configure the processor. It receives a map
// with the parameters defined in the Specification method.
func (p *AddFieldProcessor) Configure(ctx context.Context, config map[string]string) error {
p.Field = config["field"]
p.Value = config["value"]
return nil
}

// Process is called by Conduit to process records. It receives a slice of
// records and should return a slice of processed records.
func (p *AddFieldProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, record := range records {
record.Payload.After.(opencdc.StructuredData)[p.Field] = p.Value
out = append(out, sdk.SingleRecord(record))
}
return out
}
```

You also need to add an entrypoint to your processor, since it will be run as a
standalone WASM plugin:

```go
//go:build wasm

package main

import (
example "example.com/add-field-processor"
sdk "github.com/conduitio/conduit-processor-sdk"
)

func main() {
sdk.Run(&example.AddFieldProcessor{})
}
```

If the processor is very simple and can be reduced to a single function (e.g.
no configuration needed), then we can use `sdk.NewProcessorFunc()`, as below:

```go
//go:build wasm

package main

import (
sdk "github.com/conduitio/conduit-processor-sdk"
)

func main() {
sdk.Run(&sdk.NewProcessorFunc(
sdk.Specification{Name: "simple-processor"}),
func(ctx context.Context, rec opencdc.Record) (opencdc.Record, error) {
// do something with the record
return rec
},
)
}
```

With this you are set to build your processor. Note that we are building the
processor as a WebAssembly module, so you need to set `GOARCH` and `GOOS`:

```sh
GOARCH=wasm GOOS=wasip1 go build -o add-field-processor.wasm cmd/main.go
```

The produced `add-field-processor.wasm` file can be used as a processor in
Conduit. Copy it to the `processors` directory of your Conduit instance and
configure it in the `processors` section of the pipeline configuration file.

## FAQ

### Why do I need to specify `GOARCH` and `GOOS`?

Conduit uses [WebAssembly](https://webassembly.org) to run standalone processors.
This means that you need to build your processor as a WebAssembly module. You can
do this by setting the environment variables `GOARCH=wasm` and `GOOS=wasip1` when
running `go build`. This will produce a WebAssembly module that can be used as a
processor in Conduit.

### How do I use a processor?

To use a standalone WASM processor in Conduit, the following two steps need to be
done:

1. Copying the WebAssembly module to the processors directory of your Conduit
instance. By default, that's a directory called `processors` that is in the same
directory as Conduit. The directory can be changed with the `processors.path` flag.

An example directory structure would be:
```shell
tree .
.
├── conduit
└── processors
└── add-field-processor.wasm
```
2. Use the processor in the `processors` section of the pipeline configuration file.
using the name the processor defines in its specifications. An example configuration
for the processor above would be:
```yaml
processors:
- id: add-foo-field
plugin: myAddFieldProcessor
settings:
field: 'foo'
value: 'bar'
```
### How do I log from a processor?
You can get a `zerolog.logger` instance from the context using the
[`sdk.Logger`](https://pkg.go.dev/github.com/conduitio/conduit-processor-sdk#Logger)
function. This logger is pre-configured to append logs in the format expected by
Conduit.

Keep in mind that logging in the hot path (i.e. in the `Process` method) can have
a significant impact on the performance of your processor, therefore we recommend
to use the `Trace` level for logs that are not essential for the operation of the
processor.

Example:

```go
func (p *AddFieldProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
logger := sdk.Logger(ctx)
logger.Trace().Msg("Processing records")
// ...
}
```
10 changes: 1 addition & 9 deletions processor_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ func NewProcessorFunc(specs Specification, f func(context.Context, opencdc.Recor
}
}

func (f ProcessorFunc) Specification() (Specification, error) { return f.specs, nil }
func (ProcessorFunc) Configure(context.Context, map[string]string) error { return nil }
func (ProcessorFunc) Open(context.Context) error {
return nil
}
func (f ProcessorFunc) Specification() (Specification, error) { return f.specs, nil }

func (f ProcessorFunc) Process(ctx context.Context, records []opencdc.Record) []ProcessedRecord {
outRecs := make([]ProcessedRecord, len(records))
Expand All @@ -63,7 +59,3 @@ func (f ProcessorFunc) Process(ctx context.Context, records []opencdc.Record) []
}
return outRecs
}

func (ProcessorFunc) Teardown(context.Context) error {
return nil
}
14 changes: 7 additions & 7 deletions unimplemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ func (UnimplementedProcessor) Specification() (Specification, error) {
return Specification{}, fmt.Errorf("action \"Specification\": %w", ErrUnimplemented)
}

// Configure needs to be overridden in the actual implementation.
// Configure is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) Configure(context.Context, map[string]string) error {
return fmt.Errorf("action \"Configure\": %w", ErrUnimplemented)
return nil
}

// Open needs to be overridden in the actual implementation.
// Open is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) Open(context.Context) error {
return fmt.Errorf("action \"Open\": %w", ErrUnimplemented)
return nil
}

// Process needs to be overridden in the actual implementation.
func (UnimplementedProcessor) Process(context.Context, []opencdc.Record) []ProcessedRecord {
return nil
return []ProcessedRecord{ErrorRecord{Error: ErrUnimplemented}}
}

// Teardown needs to be overridden in the actual implementation.
// Teardown is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) Teardown(context.Context) error {
return fmt.Errorf("action \"Teardown\": %w", ErrUnimplemented)
return nil
}

func (UnimplementedProcessor) mustEmbedUnimplementedProcessor() {}

0 comments on commit 02b60dc

Please sign in to comment.