Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Destination #30

Merged
merged 30 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ffa5b93
Implement a Destination base
BohdanMyronchuk Nov 25, 2022
e4bc1dd
Implement the Writer for the Destination (#8)
BohdanMyronchuk Nov 29, 2022
d84f466
Merge branch 'source' into destination
oykmnk Nov 29, 2022
eff2ff2
Merge branch 'source' into destination
oykmnk Nov 30, 2022
1dca3e8
Implement acceptance test (#14)
oykmnk Dec 2, 2022
205b099
Merge branch 'source' into destination
oykmnk Dec 2, 2022
2e8bb0a
Merge branch 'source' into destination
oykmnk Dec 6, 2022
cca0262
Add Destination documentation (#17)
oykmnk Dec 6, 2022
fda7114
Destination integration test (#12)
BohdanMyronchuk Dec 8, 2022
aa6ffad
Merge branch 'source' into destination
oykmnk Dec 8, 2022
0419a55
Merge branch 'source' into destination
oykmnk Dec 13, 2022
962b6c4
Merge branch 'source' into destination
oykmnk Dec 19, 2022
f8263a4
Merge branch 'source' into destination
oykmnk Dec 19, 2022
cf10e3e
Merge branch 'source' into destination
oykmnk Jan 18, 2023
37e2417
Merge branch 'source' into destination
oykmnk Jan 20, 2023
e443bef
Merge branch 'source' into destination
oykmnk Feb 10, 2023
4736288
Merge branch 'source' into destination
oykmnk Feb 13, 2023
5324875
Merge branch 'source' into destination
voscob Feb 27, 2023
bc2e322
Merge branch 'destination' of https://github.com/conduitio-labs/condu…
voscob Feb 27, 2023
2cae796
Add error check to the destination_test
voscob Feb 27, 2023
4cbd9a7
Rename the Write return variable
voscob Feb 28, 2023
3abe6a4
Add test-integration command
voscob Feb 28, 2023
95a4bbc
Update Destination Parameters with sdk.Validation
voscob Feb 28, 2023
879b0ac
Add a few data types to acceptance test
voscob Feb 28, 2023
3cce060
Update AuthMechanism Destination Parameters
voscob Feb 28, 2023
7b20436
Merge branch 'source' into destination
voscob Feb 28, 2023
bdd1a6b
Update AuthMechanism Destination Parameters
voscob Mar 1, 2023
73b3939
Add Integration test step to the build github action
voscob Mar 1, 2023
00edb47
Remove test constant from command package
voscob Mar 1, 2023
43b216f
Merge branch 'source' into destination
voscob Mar 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ issues:
- goerr113
- maintidx
- paralleltest # we don't want to run the integration tests in parallel because we want deterministic results
- path: acceptance_test
linters:
- paralleltest
- path: codec
linters:
- wrapcheck # the codec package contains mongo driver specific code and we don't need to wrap errors from there
- wrapcheck # the codec package contains mongo driver specific code and we don't need to wrap errors from there
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ build:

test:
hariso marked this conversation as resolved.
Show resolved Hide resolved
docker run --rm -d -p 27017:27017 --name mongodb mongo --replSet=test
sleep $(MONGODB_STARTUP_TIMEOUT)
sleep $(MONGODB_STARTUP_TIMEOUT)
docker exec mongodb mongosh --eval "rs.initiate();"
go test $(GOTEST_FLAGS) ./...; ret=$$?; \
docker stop mongodb; \
Expand All @@ -18,4 +18,5 @@ lint:
golangci-lint run --config .golangci.yml

mockgen:
mockgen -package mock -source source/source.go -destination source/mock/source.go
mockgen -package mock -source source/source.go -destination source/mock/source.go
mockgen -package mock -source destination/destination.go -destination destination/mock/destination.go
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,31 @@ The connector stores a `resumeToken` of every Change Stream event in a position,
The connector always uses the `_id` field as a key.

If the `_id` field is `bson.ObjectID` the connector converts it to a string when transferring a record to a destination, otherwise, it leaves it unchanged.

## Destination

The MongoDB Destination takes a `sdk.Record` and parses it into a valid MongoDB query. The Destination is designed to handle different payloads and keys. Because of this, each record is individually parsed and written.

### Collection name

If a record contains a `mongo.collection` property in its metadata it will be written in that collection, otherwise it will fall back to use the `collection` configured in the connector. Thus, a Destination can support multiple collections in the same connector, as long as the user has proper access to those collections.

### Configuration

| name | description | required | default |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `uri` | The connection string. The URI can contain host names, IPv4/IPv6 literals, or an SRV record. | false | `mongodb://localhost:27017` |
| `db` | The name of a database the connector must work with. | **true** | |
| `collection` | The name of a collection the connector must write to. | **true** | |
| `auth.username` | The username. | false | |
| `auth.password` | The user's password. | false | |
| `auth.db` | The name of a database that contains the user's authentication data. | false | `admin` |
| `auth.mechanism` | The authentication mechanism. The available values are `SCRAM-SHA-256`, `SCRAM-SHA-1`, `MONGODB-CR`, `MONGODB-AWS`, `MONGODB-X509`. | false | The default mechanism that [defined depending on your MongoDB server version](https://www.mongodb.com/docs/drivers/go/current/fundamentals/auth/#default). |
| `auth.tls.caFile` | The path to either a single or a bundle of certificate authorities to trust when making a TLS connection. | false | |
| `auth.tls.certificateKeyFile` | The path to the client certificate file or the client private key file. | false | |

### Key handling

The connector uses all keys from an `sdk.Record` when updating and deleting documents.

If the `_id` field can be converted to a `bson.ObjectID`, the connector converts it, otherwise, it uses it as it is.
143 changes: 143 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright © 2022 Meroxa, Inc. & Yalantis
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mongo

import (
"context"
"fmt"
"testing"
"time"

"github.com/brianvoe/gofakeit"
"github.com/conduitio-labs/conduit-connector-mongo/config"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

const (
// set the directConnection to true in order to avoid the known hostname problem.
testURI = "mongodb://localhost:27017/?directConnection=true"
hariso marked this conversation as resolved.
Show resolved Hide resolved
testDB = "test_acceptance"
testCollectionPrefix = "test_acceptance_coll"
)

type driver struct {
sdk.ConfigurableAcceptanceTestDriver
}

// GenerateRecord overrides the [sdk.ConfigurableAcceptanceTestDriver] GenerateRecord method.
// It generates a MongoDB-specific payload and a random bson.ObjectID key converted to a string.
func (d driver) GenerateRecord(t *testing.T, operation sdk.Operation) sdk.Record {
t.Helper()

id := primitive.NewObjectID().String()

return sdk.Record{
Operation: operation,
Key: sdk.StructuredData{
"_id": id,
},
Payload: sdk.Change{
After: sdk.StructuredData{
"_id": id,
"name": gofakeit.Name(),
"email": gofakeit.Email(),
"created_at": time.Now().Format(time.RFC3339),
},
hariso marked this conversation as resolved.
Show resolved Hide resolved
},
}
}

func TestAcceptance(t *testing.T) {
cfg := map[string]string{
config.KeyURI: testURI,
config.KeyDB: testDB,
}

sdk.AcceptanceTest(t, driver{
sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: Connector,
SourceConfig: cfg,
DestinationConfig: cfg,
BeforeTest: beforeTest(cfg),
AfterTest: afterTest(cfg),
},
},
})
}

// beforeTest set the config collection field to a unique name prefixed with the testCollectionPrefix.
func beforeTest(cfg map[string]string) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

is := is.New(t)

// create a test mongo client
mongoClient, err := createTestMongoClient(context.Background(), cfg[config.KeyURI])
is.NoErr(err)
defer func() {
err = mongoClient.Disconnect(context.Background())
is.NoErr(err)
}()

cfg[config.KeyCollection] = fmt.Sprintf("%s_%d", testCollectionPrefix, time.Now().UnixNano())

// connect to the test database and create a collection
testDatabase := mongoClient.Database(cfg[config.KeyDB])
is.NoErr(testDatabase.CreateCollection(context.Background(), cfg[config.KeyCollection]))
hariso marked this conversation as resolved.
Show resolved Hide resolved
}
}

// afterTest connects to a MongoDB instance and drops a test collection.
func afterTest(cfg map[string]string) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

is := is.New(t)

// create a test mongo client
mongoClient, err := createTestMongoClient(context.Background(), cfg[config.KeyURI])
is.NoErr(err)
defer func() {
err = mongoClient.Disconnect(context.Background())
is.NoErr(err)
}()

// connect to the test database and collection
testDatabase := mongoClient.Database(cfg[config.KeyDB])
testCollection := testDatabase.Collection(cfg[config.KeyCollection])

// drop the test collection
err = testCollection.Drop(context.Background())
is.NoErr(err)
}
}

// createTestMongoClient connects to a MongoDB by a provided URI.
func createTestMongoClient(ctx context.Context, uri string) (*mongo.Client, error) {
opts := options.Client().ApplyURI(uri)

mongoClient, err := mongo.Connect(ctx, opts)
if err != nil {
return nil, fmt.Errorf("mongo connect: %w", err)
}

return mongoClient, nil
}
3 changes: 2 additions & 1 deletion cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package main

import (
mongo "github.com/conduitio-labs/conduit-connector-mongo"
sdk "github.com/conduitio/conduit-connector-sdk"

mongo "github.com/conduitio-labs/conduit-connector-mongo"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion config/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type InvalidAuthMechanismError struct {
AuthMechanism AuthMechanism
}

// Error returns a formated error message for the [InvalidAuthMechanismError].
// Error returns a formatted error message for the [InvalidAuthMechanismError].
func (e *InvalidAuthMechanismError) Error() string {
return fmt.Sprintf("invalid auth mechanism %q", e.AuthMechanism)
}
4 changes: 3 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package mongo
import (
"github.com/conduitio-labs/conduit-connector-mongo/source"
sdk "github.com/conduitio/conduit-connector-sdk"

"github.com/conduitio-labs/conduit-connector-mongo/destination"
)

var Connector = sdk.Connector{
NewSpecification: Specification,
NewSource: source.NewSource,
NewDestination: nil,
NewDestination: destination.NewDestination,
}
Loading