Skip to content

Commit

Permalink
update SDK to v0.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Sep 6, 2024
1 parent 8461512 commit ccb49b6
Show file tree
Hide file tree
Showing 18 changed files with 378 additions and 749 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ linters:
# - forbidigo
# - forcetypeassert # TODO enable
# - funlen # TODO enable
# - gci # TODO enable
- gci
- ginkgolinter
- gocheckcompilerdirectives
- gochecknoinits
Expand Down
17 changes: 9 additions & 8 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
"testing"

"github.com/brianvoe/gofakeit"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/config"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/common"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/test"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
)
Expand All @@ -29,19 +30,19 @@ type driver struct {
sdk.ConfigurableAcceptanceTestDriver
}

func (d driver) ReadFromDestination(_ *testing.T, records []sdk.Record) []sdk.Record {
func (d driver) ReadFromDestination(_ *testing.T, records []opencdc.Record) []opencdc.Record {
return records
}

func (d driver) GenerateRecord(_ *testing.T, operation sdk.Operation) sdk.Record {
func (d driver) GenerateRecord(_ *testing.T, operation opencdc.Operation) opencdc.Record {
id := gofakeit.Int32()

return sdk.Record{
return opencdc.Record{
Position: nil,
Operation: operation,
Metadata: nil,
Payload: sdk.Change{
After: sdk.RawData([]byte(
Payload: opencdc.Change{
After: opencdc.RawData([]byte(
fmt.Sprintf(`"id":%d,"name":"%s"`, id, gofakeit.FirstName()),
)),
},
Expand All @@ -51,7 +52,7 @@ func (d driver) GenerateRecord(_ *testing.T, operation sdk.Operation) sdk.Record
//nolint:paralleltest // we don't need the paralleltest here
func TestAcceptance(t *testing.T) {
cfg := map[string]string{
config.KeyURLs: test.TestURL,
common.KeyURLs: test.TestURL,
}

sdk.AcceptanceTest(t, driver{
Expand All @@ -62,7 +63,7 @@ func TestAcceptance(t *testing.T) {
DestinationConfig: cfg,
BeforeTest: func(t *testing.T) {
subject := t.Name() + uuid.New().String()
cfg[config.KeySubject] = subject
cfg[common.KeySubject] = subject
},
Skip: []string{
// NATS PubSub doesn't handle position
Expand Down
2 changes: 1 addition & 1 deletion config/config.go → common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package common

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go → common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package common

import (
"reflect"
Expand Down
3 changes: 1 addition & 2 deletions common/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ package common
import (
"fmt"

"github.com/conduitio-labs/conduit-connector-nats-pubsub/config"
"github.com/nats-io/nats.go"
)

// GetConnectionOptions returns connection options based on the provided config.
func GetConnectionOptions(config config.Config) ([]nats.Option, error) {
func GetConnectionOptions(config Config) ([]nats.Option, error) {
var opts []nats.Option

if config.ConnectionName != "" {
Expand Down
67 changes: 32 additions & 35 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,107 +20,104 @@ import (
"strings"

"github.com/conduitio-labs/conduit-connector-nats-pubsub/common"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/config"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/destination/pubsub"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/nats-io/nats.go"
)

// Writer defines a writer interface needed for the Destination.
type Writer interface {
Write(record sdk.Record) error
Write(record opencdc.Record) error
Close() error
}

// Destination NATS Connector sends records to a NATS subject.
type Destination struct {
sdk.UnimplementedDestination

config config.Config
config Config
writer Writer
}

type Config struct {
common.Config
}

// NewDestination creates new instance of the Destination.
func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

// Parameters returns a map of named sdk.Parameters that describe how to configure the Destination.
func (d *Destination) Parameters() map[string]sdk.Parameter {
return map[string]sdk.Parameter{
config.KeyURLs: {
// Parameters returns a map of named config.Parameters that describe how to configure the Destination.
func (d *Destination) Parameters() config.Parameters {
return map[string]config.Parameter{
common.KeyURLs: {
Default: "",
Required: true,
Description: "The connection URLs pointed to NATS instances.",
Validations: []config.Validation{config.ValidationRequired{}},
},
config.KeySubject: {
common.KeySubject: {
Default: "",
Required: true,
Description: "A name of a subject to which the connector should write.",
Validations: []config.Validation{config.ValidationRequired{}},
},
config.KeyConnectionName: {
common.KeyConnectionName: {
Default: "conduit-connection-<uuid>",
Required: false,
Description: "Optional connection name which will come in handy when it comes to monitoring.",
},
config.KeyNKeyPath: {
common.KeyNKeyPath: {
Default: "",
Required: false,
Description: "A path pointed to a NKey pair.",
},
config.KeyCredentialsFilePath: {
common.KeyCredentialsFilePath: {
Default: "",
Required: false,
Description: "A path pointed to a credentials file.",
},
config.KeyTLSClientCertPath: {
Default: "",
Required: false,
common.KeyTLSClientCertPath: {
Default: "",
Description: "A path pointed to a TLS client certificate, must be present " +
"if tls.clientPrivateKeyPath field is also present.",
},
config.KeyTLSClientPrivateKeyPath: {
Default: "",
Required: false,
common.KeyTLSClientPrivateKeyPath: {
Default: "",
Description: "A path pointed to a TLS client private key, must be present " +
"if tls.clientCertPath field is also present.",
},
config.KeyTLSRootCACertPath: {
common.KeyTLSRootCACertPath: {
Default: "",
Required: false,
Description: "A path pointed to a TLS root certificate, provide if you want to verify server’s identity.",
},
config.KeyMaxReconnects: {
Default: "5",
Required: false,
common.KeyMaxReconnects: {
Default: "5",
Description: "Sets the number of reconnect attempts " +
"that will be tried before giving up. If negative, " +
"then it will never give up trying to reconnect.",
},
config.KeyReconnectWait: {
Default: "5s",
Required: false,
common.KeyReconnectWait: {
Default: "5s",
Description: "Sets the time to backoff after attempting a reconnect " +
"to a server that we were already connected to previously.",
},
}
}

// Configure parses and initializes the config.
func (d *Destination) Configure(_ context.Context, cfg map[string]string) error {
config, err := config.Parse(cfg)
func (d *Destination) Configure(_ context.Context, cfg config.Config) error {
commonCfg, err := common.Parse(cfg)
if err != nil {
return fmt.Errorf("parse config: %w", err)
}

d.config = config
d.config.Config = commonCfg

return nil
}

// Open makes sure everything is prepared to receive records.
func (d *Destination) Open(context.Context) error {
opts, err := common.GetConnectionOptions(d.config)
opts, err := common.GetConnectionOptions(d.config.Config)
if err != nil {
return fmt.Errorf("get connection options: %s", err)
}
Expand All @@ -142,7 +139,7 @@ func (d *Destination) Open(context.Context) error {
}

// Write writes a record into a Destination.
func (d *Destination) Write(_ context.Context, records []sdk.Record) (int, error) {
func (d *Destination) Write(_ context.Context, records []opencdc.Record) (int, error) {
for i, record := range records {
err := d.writer.Write(record)
if err != nil {
Expand Down
38 changes: 19 additions & 19 deletions destination/destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"testing"
"time"

"github.com/conduitio-labs/conduit-connector-nats-pubsub/config"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/common"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
"github.com/nats-io/nats.go"
)
Expand All @@ -35,8 +35,8 @@ func TestDestination_OpenSuccess(t *testing.T) {
destination := NewDestination()

err := destination.Configure(context.Background(), map[string]string{
config.KeyURLs: test.TestURL,
config.KeySubject: "foo_destination",
common.KeyURLs: test.TestURL,
common.KeySubject: "foo_destination",
})
is.NoErr(err)

Expand All @@ -55,8 +55,8 @@ func TestDestination_OpenFail(t *testing.T) {
destination := NewDestination()

err := destination.Configure(context.Background(), map[string]string{
config.KeyURLs: "nats://localhost:6666",
config.KeySubject: "foo_destination",
common.KeyURLs: "nats://localhost:6666",
common.KeySubject: "foo_destination",
})
is.NoErr(err)

Expand Down Expand Up @@ -88,20 +88,20 @@ func TestDestination_WriteOneMessage(t *testing.T) {
destination := NewDestination()

err = destination.Configure(context.Background(), map[string]string{
config.KeyURLs: test.TestURL,
config.KeySubject: subject,
common.KeyURLs: test.TestURL,
common.KeySubject: subject,
})
is.NoErr(err)

err = destination.Open(context.Background())
is.NoErr(err)

var count int
count, err = destination.Write(context.Background(), []sdk.Record{
count, err = destination.Write(context.Background(), []opencdc.Record{
{
Operation: sdk.OperationCreate,
Payload: sdk.Change{
After: sdk.RawData([]byte("hello")),
Operation: opencdc.OperationCreate,
Payload: opencdc.Change{
After: opencdc.RawData([]byte("hello")),
},
},
})
Expand Down Expand Up @@ -138,20 +138,20 @@ func TestDestination_WriteManyMessages(t *testing.T) {
destination := NewDestination()

err = destination.Configure(context.Background(), map[string]string{
config.KeyURLs: test.TestURL,
config.KeySubject: subject,
common.KeyURLs: test.TestURL,
common.KeySubject: subject,
})
is.NoErr(err)

err = destination.Open(context.Background())
is.NoErr(err)

records := make([]sdk.Record, 1000)
records := make([]opencdc.Record, 1000)
for i := 0; i < 1000; i++ {
records[i] = sdk.Record{
Operation: sdk.OperationCreate,
Payload: sdk.Change{
After: sdk.RawData([]byte(fmt.Sprintf("message #%d", i))),
records[i] = opencdc.Record{
Operation: opencdc.OperationCreate,
Payload: opencdc.Change{
After: opencdc.RawData([]byte(fmt.Sprintf("message #%d", i))),
},
}
}
Expand Down
8 changes: 4 additions & 4 deletions destination/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"context"
"testing"

"github.com/conduitio-labs/conduit-connector-nats-pubsub/config"
"github.com/conduitio-labs/conduit-connector-nats-pubsub/common"
)

func TestDestination_Configure(t *testing.T) {
Expand All @@ -39,8 +39,8 @@ func TestDestination_Configure(t *testing.T) {
args: args{
ctx: context.Background(),
cfg: map[string]string{
config.KeyURLs: "nats://127.0.0.1:4222",
config.KeySubject: "foo",
common.KeyURLs: "nats://127.0.0.1:4222",
common.KeySubject: "foo",
},
},
wantErr: false,
Expand All @@ -58,7 +58,7 @@ func TestDestination_Configure(t *testing.T) {
args: args{
ctx: context.Background(),
cfg: map[string]string{
config.KeyURLs: "nats://127.0.0.1:4222",
common.KeyURLs: "nats://127.0.0.1:4222",
},
},
wantErr: true,
Expand Down
4 changes: 2 additions & 2 deletions destination/pubsub/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package pubsub

import (
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/nats-io/nats.go"
)

Expand All @@ -41,7 +41,7 @@ func NewWriter(params WriterParams) (*Writer, error) {
}

// Write writes directly and synchronously a record to a subject.
func (w *Writer) Write(record sdk.Record) error {
func (w *Writer) Write(record opencdc.Record) error {
return w.conn.Publish(w.subject, record.Payload.After.Bytes())
}

Expand Down
Loading

0 comments on commit ccb49b6

Please sign in to comment.