Skip to content
This repository was archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
add rethinkdb -> postgresql integration test, fix connection leak in …
Browse files Browse the repository at this point in the history
…pg client (#293)
  • Loading branch information
jipperinbham authored Mar 7, 2017
1 parent 5ca7b95 commit 9460a03
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 8 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ env:
- TESTDIR=integration_tests/mongo_to_mongo
- TESTDIR=integration_tests/mongo_to_es
- TESTDIR=integration_tests/mongo_to_rethink
- TESTDIR=integration_tests/rethink_to_postgres
before_install:
- "./scripts/before_install.sh"
cache:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@

### Bugfixes

## v0.2.1 [2017-03-07]

### Features
- added RethinkDB -> PostgreSQL integration test

### Bugfixes
- fixed connection leak in PostgreSQL client

## v0.2.0 [2017-02-28]

Expand Down
11 changes: 7 additions & 4 deletions adaptor/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ func (c *Client) Close() {

// Connect initializes the Postgres connection
func (c *Client) Connect() (client.Session, error) {
// there's really no way for this to error because we know the driver we're passing is
// available.
c.pqSession, _ = sql.Open("postgres", c.uri)
return &Session{c.pqSession}, nil
if c.pqSession == nil {
// there's really no way for this to error because we know the driver we're passing is
// available.
c.pqSession, _ = sql.Open("postgres", c.uri)
}
err := c.pqSession.Ping()
return &Session{c.pqSession}, err
}
6 changes: 3 additions & 3 deletions adaptor/postgres/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (w *Writer) Write(msg message.Msg) func(client.Session) error {
}

func insertMsg(m message.Msg, s *sql.DB) error {
log.Infof("Write INSERT to Postgres %v", m.Namespace())
log.With("table", m.Namespace()).Debugln("INSERT")
var (
keys []string
placeholders []string
Expand Down Expand Up @@ -71,7 +71,7 @@ func insertMsg(m message.Msg, s *sql.DB) error {
}

func deleteMsg(m message.Msg, s *sql.DB) error {
fmt.Printf("Write DELETE to Postgres %v values %v\n", m.Namespace(), m.Data())
log.With("table", m.Namespace()).With("values", m.Data()).Debugln("DELETE")
var (
ckeys []string
vals []interface{}
Expand Down Expand Up @@ -103,7 +103,7 @@ func deleteMsg(m message.Msg, s *sql.DB) error {
}

func updateMsg(m message.Msg, s *sql.DB) error {
fmt.Printf("Write UPDATE to Postgres %v\n", m.Namespace())
log.With("table", m.Namespace()).Debugln("UPDATE")
var (
ckeys []string
ukeys []string
Expand Down
9 changes: 8 additions & 1 deletion integration_tests/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ nodes:
enron_sink_es:
type: elasticsearch
uri: https://${ES_ENRON_SINK_USER}:${ES_ENRON_SINK_PASSWORD}@${ES_ENRON_SINK_URI}
enron_source_rethink:
type: rethinkdb
uri: rethink://admin:${RETHINKDB_ENRON_SOURCE_PASSWORD}@${RETHINKDB_ENRON_SOURCE_URI}/enron
ssl: true
enron_sink_rethink:
type: rethinkdb
uri: rethink://${RETHINKDB_ENRON_SINK_USER}:${RETHINKDB_ENRON_SINK_PASSWORD}@${RETHINKDB_ENRON_SINK_URI}/enron
uri: rethink://admin:${RETHINKDB_ENRON_SINK_PASSWORD}@${RETHINKDB_ENRON_SINK_URI}/enron
ssl: true
enron_sink_postgres:
type: postgres
uri: postgres://${POSTGRES_ENRON_SINK_USER}:${POSTGRES_ENRON_SINK_PASSWORD}@${POSTGRES_ENRON_SINK_URI}
4 changes: 4 additions & 0 deletions integration_tests/mongo_to_es/transporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strings"
"testing"
"time"
)

func TestMongoToElasticsearchDocCount(t *testing.T) {
Expand All @@ -22,6 +23,9 @@ func TestMongoToElasticsearchDocCount(t *testing.T) {
t.Fatalf("unexpected error, %s", err)
}

// give the cluster a bit to breathe
time.Sleep(30 * time.Second)

req, _ = http.NewRequest(
http.MethodGet,
fmt.Sprintf("https://%s/enron/emails/_count", strings.Split(os.Getenv("ES_ENRON_SINK_URI"), ",")[0]),
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/rethink_to_postgres/app.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Source({name:"enron_source_rethink", namespace: 'enron.emails'})
.save({name:"enron_sink_postgres", namespace: 'enron.emails'});
72 changes: 72 additions & 0 deletions integration_tests/rethink_to_postgres/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// +build integration

package integration_test

import (
"database/sql"
"flag"
"fmt"
"os"
"testing"

"github.com/compose/transporter/log"

_ "github.com/lib/pq" // import pq driver
)

const (
emailsSchema = `id varchar(24),
body TEXT,
filename varchar(255),
headers jsonb,
mailbox varchar(255),
subFolder varchar(255),
PRIMARY KEY (id)`
)

var (
postgresSourceSession *sql.DB
cleanup = flag.Bool("cleanup", false, "used to determine whether or not to run cleanup function")
)

func setup() {
log.Infoln("setting up tests")
u := fmt.Sprintf("postgres://%s:%s@%s",
os.Getenv("POSTGRES_ENRON_SINK_USER"),
os.Getenv("POSTGRES_ENRON_SINK_PASSWORD"),
os.Getenv("POSTGRES_ENRON_SINK_URI"))
postgresSourceSession, _ = sql.Open("postgres", u)
}

func cleanupData() {
log.Infoln("cleaning up data")

if _, err := postgresSourceSession.Exec("DROP TABLE IF EXISTS emails;"); err != nil {
log.Errorf("unable to drop table, could affect tests, %s", err)
}

_, err := postgresSourceSession.Exec(fmt.Sprintf("CREATE TABLE emails ( %s );", emailsSchema))
if err != nil {
log.Errorf("unable to create table, could affect tests, %s", err)
}
}

func TestMain(m *testing.M) {
flag.Parse()
setup()
if *cleanup {
cleanupData()
shutdown()
os.Exit(0)
return
}
code := m.Run()
shutdown()
os.Exit(code)
}

func shutdown() {
log.Infoln("shutting down tests")
log.Infoln("tests shutdown complete")
}
18 changes: 18 additions & 0 deletions integration_tests/rethink_to_postgres/transporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// +build integration

package integration_test

import (
"testing"
)

func TestRethinkToPostgresDocCount(t *testing.T) {
var count int
err := postgresSourceSession.QueryRow("SELECT COUNT(id) FROM emails;").Scan(&count)
if err != nil {
t.Errorf("unable to count table, %s", err)
}
if count != 5477 {
t.Errorf("bad emailCount, expected 5477, got %d", count)
}
}

0 comments on commit 9460a03

Please sign in to comment.