Skip to content

Commit

Permalink
Implement Destination (#30)
Browse files Browse the repository at this point in the history
* Implement a Destination base

* Implement the Writer for the Destination (#8)

* Implement acceptance test (#14)

* Add Destination documentation (#17)

* Destination integration test (#12)

* Add error check to the destination_test

* Rename the Write return variable

* Add test-integration command

* Update Destination Parameters with sdk.Validation

* Add a few data types to acceptance test

* Update AuthMechanism Destination Parameters

* Update AuthMechanism Destination Parameters

* Add Integration test step to the build github action

* Remove test constant from command package

---------

Co-authored-by: BohdanMyronchuk <[email protected]>
Co-authored-by: Yurii Voskoboinikov <[email protected]>
  • Loading branch information
3 people authored Mar 2, 2023
1 parent f083973 commit e9c219a
Show file tree
Hide file tree
Showing 14 changed files with 1,032 additions and 11 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ jobs:
go-version: 1.18

- name: Test
run: make test MONGODB_STARTUP_TIMEOUT=8 GOTEST_FLAGS="-v -count=1 -race"
run: make test GOTEST_FLAGS="-v -count=1 -race"

- name: Integration test
run: make test-integration MONGODB_STARTUP_TIMEOUT=8 GOTEST_FLAGS="-v -count=1 -race"
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ issues:
- goerr113
- maintidx
- paralleltest # we don't want to run the integration tests in parallel because we want deterministic results
- path: acceptance_test
linters:
- paralleltest
- path: codec
linters:
- wrapcheck # the codec package contains mongo driver specific code and we don't need to wrap errors from there
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: build test lint
.PHONY: build test test-integration lint mockgen

VERSION = $(shell git describe --tags --dirty --always)
MONGODB_STARTUP_TIMEOUT ?= 4
Expand All @@ -7,9 +7,13 @@ build:
go build -ldflags "-X 'github.com/conduitio-labs/conduit-connector-mongo.version=${VERSION}'" -o conduit-connector-mongo cmd/connector/main.go

test:
go test $(GOTEST_FLAGS) ./...

test-integration:
docker run --rm -d -p 27017:27017 --name mongodb mongo --replSet=test
sleep $(MONGODB_STARTUP_TIMEOUT)
sleep $(MONGODB_STARTUP_TIMEOUT)
docker exec mongodb mongosh --eval "rs.initiate();"
export CONNECTION_URI=mongodb://localhost:27017/?directConnection=true && \
go test $(GOTEST_FLAGS) ./...; ret=$$?; \
docker stop mongodb; \
exit $$ret
Expand All @@ -18,4 +22,5 @@ lint:
golangci-lint run --config .golangci.yml

mockgen:
mockgen -package mock -source source/source.go -destination source/mock/source.go
mockgen -package mock -source source/source.go -destination source/mock/source.go
mockgen -package mock -source destination/destination.go -destination destination/mock/destination.go
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,31 @@ The connector stores a `resumeToken` of every Change Stream event in a position,
The connector always uses the `_id` field as a key.

If the `_id` field is `bson.ObjectID` the connector converts it to a string when transferring a record to a destination, otherwise, it leaves it unchanged.

## Destination

The MongoDB Destination takes a `sdk.Record` and parses it into a valid MongoDB query. The Destination is designed to handle different payloads and keys. Because of this, each record is individually parsed and written.

### Collection name

If a record contains a `mongo.collection` property in its metadata it will be written in that collection, otherwise it will fall back to use the `collection` configured in the connector. Thus, a Destination can support multiple collections in the same connector, as long as the user has proper access to those collections.

### Configuration

| name | description | required | default |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `uri` | The connection string. The URI can contain host names, IPv4/IPv6 literals, or an SRV record. | false | `mongodb://localhost:27017` |
| `db` | The name of a database the connector must work with. | **true** | |
| `collection` | The name of a collection the connector must write to. | **true** | |
| `auth.username` | The username. | false | |
| `auth.password` | The user's password. | false | |
| `auth.db` | The name of a database that contains the user's authentication data. | false | `admin` |
| `auth.mechanism` | The authentication mechanism. The available values are `SCRAM-SHA-256`, `SCRAM-SHA-1`, `MONGODB-CR`, `MONGODB-AWS`, `MONGODB-X509`. | false | The default mechanism that [defined depending on your MongoDB server version](https://www.mongodb.com/docs/drivers/go/current/fundamentals/auth/#default). |
| `auth.tls.caFile` | The path to either a single or a bundle of certificate authorities to trust when making a TLS connection. | false | |
| `auth.tls.certificateKeyFile` | The path to the client certificate file or the client private key file. | false | |

### Key handling

The connector uses all keys from an `sdk.Record` when updating and deleting documents.

If the `_id` field can be converted to a `bson.ObjectID`, the connector converts it, otherwise, it uses it as it is.
151 changes: 151 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright © 2022 Meroxa, Inc. & Yalantis
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mongo

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/brianvoe/gofakeit"
"github.com/conduitio-labs/conduit-connector-mongo/config"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

const (
testEnvNameURI = "CONNECTION_URI"
testDB = "test_acceptance"
testCollectionPrefix = "test_acceptance_coll"
)

type driver struct {
sdk.ConfigurableAcceptanceTestDriver
}

// GenerateRecord overrides the [sdk.ConfigurableAcceptanceTestDriver] GenerateRecord method.
// It generates a MongoDB-specific payload and a random bson.ObjectID key converted to a string.
func (d driver) GenerateRecord(t *testing.T, operation sdk.Operation) sdk.Record {
t.Helper()

id := primitive.NewObjectID().String()

return sdk.Record{
Operation: operation,
Key: sdk.StructuredData{
"_id": id,
},
Payload: sdk.Change{
After: sdk.StructuredData{
"_id": id,
"name": gofakeit.Name(),
"email": gofakeit.Email(),
"created_at": time.Now().Format(time.RFC3339),
"float64": gofakeit.Float64(),
"map": map[string]any{"key1": gofakeit.Name(), "key2": gofakeit.Float64()},
"slice": []any{gofakeit.Name(), gofakeit.Float64()},
},
},
}
}

func TestAcceptance(t *testing.T) {
uri := os.Getenv(testEnvNameURI)
if uri == "" {
t.Skipf("%s env var must be set", testEnvNameURI)
}

cfg := map[string]string{
config.KeyURI: uri,
config.KeyDB: testDB,
}

sdk.AcceptanceTest(t, driver{
sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: Connector,
SourceConfig: cfg,
DestinationConfig: cfg,
BeforeTest: beforeTest(cfg),
AfterTest: afterTest(cfg),
},
},
})
}

// beforeTest set the config collection field to a unique name prefixed with the testCollectionPrefix.
func beforeTest(cfg map[string]string) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

is := is.New(t)

// create a test mongo client
mongoClient, err := createTestMongoClient(context.Background(), cfg[config.KeyURI])
is.NoErr(err)
defer func() {
err = mongoClient.Disconnect(context.Background())
is.NoErr(err)
}()

cfg[config.KeyCollection] = fmt.Sprintf("%s_%d", testCollectionPrefix, time.Now().UnixNano())

// connect to the test database and create a collection
testDatabase := mongoClient.Database(cfg[config.KeyDB])
is.NoErr(testDatabase.CreateCollection(context.Background(), cfg[config.KeyCollection]))
}
}

// afterTest connects to a MongoDB instance and drops a test collection.
func afterTest(cfg map[string]string) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

is := is.New(t)

// create a test mongo client
mongoClient, err := createTestMongoClient(context.Background(), cfg[config.KeyURI])
is.NoErr(err)
defer func() {
err = mongoClient.Disconnect(context.Background())
is.NoErr(err)
}()

// connect to the test database and collection
testDatabase := mongoClient.Database(cfg[config.KeyDB])
testCollection := testDatabase.Collection(cfg[config.KeyCollection])

// drop the test collection
err = testCollection.Drop(context.Background())
is.NoErr(err)
}
}

// createTestMongoClient connects to a MongoDB by a provided URI.
func createTestMongoClient(ctx context.Context, uri string) (*mongo.Client, error) {
opts := options.Client().ApplyURI(uri)

mongoClient, err := mongo.Connect(ctx, opts)
if err != nil {
return nil, fmt.Errorf("mongo connect: %w", err)
}

return mongoClient, nil
}
3 changes: 2 additions & 1 deletion cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package main

import (
mongo "github.com/conduitio-labs/conduit-connector-mongo"
sdk "github.com/conduitio/conduit-connector-sdk"

mongo "github.com/conduitio-labs/conduit-connector-mongo"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion config/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type InvalidAuthMechanismError struct {
AuthMechanism AuthMechanism
}

// Error returns a formated error message for the [InvalidAuthMechanismError].
// Error returns a formatted error message for the [InvalidAuthMechanismError].
func (e *InvalidAuthMechanismError) Error() string {
return fmt.Sprintf("invalid auth mechanism %q", e.AuthMechanism)
}
4 changes: 3 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package mongo
import (
"github.com/conduitio-labs/conduit-connector-mongo/source"
sdk "github.com/conduitio/conduit-connector-sdk"

"github.com/conduitio-labs/conduit-connector-mongo/destination"
)

var Connector = sdk.Connector{
NewSpecification: Specification,
NewSource: source.NewSource,
NewDestination: nil,
NewDestination: destination.NewDestination,
}
Loading

0 comments on commit e9c219a

Please sign in to comment.