Skip to content

Commit

Permalink
fixup! feat: sqlconnect library
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Feb 16, 2024
1 parent 55718a9 commit c8217cf
Show file tree
Hide file tree
Showing 29 changed files with 537 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
run: |
go install github.com/wadey/gocovmerge@latest
gocovmerge */profile.out > profile.out
- uses: codecov/codecov-action@v3
- uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
files: ./profile.out
Expand Down
35 changes: 9 additions & 26 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: help default test test-run test-teardown generate lint fmt
.PHONY: help default test test-run generate lint fmt

GO=go
LDFLAGS?=-s -w
Expand All @@ -9,47 +9,30 @@ default: lint
generate: install-tools
$(GO) generate ./...

test: install-tools test-run test-teardown
test: install-tools test-run

test-run: ## Run all unit tests
ifeq ($(filter 1,$(debug) $(RUNNER_DEBUG)),)
$(eval TEST_CMD = SLOW=0 gotestsum --format pkgname-and-test-fails --)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=count -coverpkg=./... -vet=all --timeout=15m)
$(eval TEST_CMD = gotestsum --format pkgname-and-test-fails --)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -coverpkg=./... -vet=all --timeout=30m)
else
$(eval TEST_CMD = SLOW=0 go test)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=count -coverpkg=./... -vet=all --timeout=15m)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -coverpkg=./... -vet=all --timeout=30m)
endif
ifdef package
ifdef exclude
$(eval FILES = `go list ./$(package)/... | egrep -iv '$(exclude)'`)
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE)
else
$(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE) || true
$(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE)
endif
else ifdef exclude
$(eval FILES = `go list ./... | egrep -iv '$(exclude)'`)
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE)
else
$(TEST_CMD) -count=1 $(TEST_OPTIONS) ./... && touch $(TESTFILE) || true
$(TEST_CMD) -count=1 $(TEST_OPTIONS) ./... && touch $(TESTFILE)
endif

test-teardown:
@if [ -f "$(TESTFILE)" ]; then \
echo "Tests passed, tearing down..." ;\
rm -f $(TESTFILE) ;\
echo "mode: atomic" > coverage.txt ;\
find . -name "profile.out" | while read file; do grep -v 'mode: atomic' $${file} >> coverage.txt; rm -f $${file}; done ;\
else \
rm -f coverage.txt coverage.html ; find . -name "profile.out" | xargs rm -f ;\
echo "Tests failed :-(" ;\
exit 1 ;\
fi

coverage:
go tool cover -html=coverage.txt -o coverage.html

test-with-coverage: test coverage

help: ## Show the available commands
@grep -E '^[0-9a-zA-Z_-]+:.*?## .*$$' ./Makefile | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

Expand Down
64 changes: 64 additions & 0 deletions sqlconnect/def_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package sqlconnect_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

func TestQueryDef(t *testing.T) {
t.Run("with columns", func(t *testing.T) {
table := sqlconnect.NewRelationRef("table")
q := sqlconnect.QueryDef{
Table: &table,
Columns: []string{"col1", "col2"},
Conditions: []*sqlconnect.QueryCondition{
{Column: "col1", Operator: "=", Value: "'1'"},
{Column: "col2", Operator: ">", Value: "2"},
},
OrderBy: &sqlconnect.QueryOrder{
Column: "col1",
Order: "ASC",
},
}

sql := q.ToSQL(testDialect{})
expected := `SELECT "col1","col2" FROM "table" WHERE "col1" = '1' AND "col2" > 2 ORDER BY "col1" ASC`
require.Equal(t, expected, sql, "query should be formatted correctly")
})

t.Run("without columns", func(t *testing.T) {
table := sqlconnect.NewRelationRef("table")
q := sqlconnect.QueryDef{
Table: &table,
Conditions: []*sqlconnect.QueryCondition{
{Column: "col1", Operator: "=", Value: "'1'"},
{Column: "col2", Operator: ">", Value: "2"},
},
}

sql := q.ToSQL(testDialect{})
expected := `SELECT * FROM "table" WHERE "col1" = '1' AND "col2" > 2`
require.Equal(t, expected, sql, "query should be formatted correctly")
})
}

type testDialect struct{}

func (d testDialect) FormatTableName(name string) string {
return name
}

func (d testDialect) QuoteIdentifier(name string) string {
return fmt.Sprintf(`"%s"`, name)
}

func (d testDialect) QuoteTable(relation sqlconnect.RelationRef) string {
if relation.Schema != "" {
return fmt.Sprintf(`"%s"."%s"`, relation.Schema, relation.Name)
}
return fmt.Sprintf(`"%s"`, relation.Name)
}
2 changes: 1 addition & 1 deletion sqlconnect/internal/base/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewDB(db *sql.DB, rudderSchema string, opts ...Option) *DB {
return "SELECT schema_name FROM information_schema.schemata", "schema_name"
},
SchemaExists: func(schema string) string {
return fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata where schema_name = '%[1]s')", schema)
return fmt.Sprintf("SELECT schema_name FROM information_schema.schemata where schema_name = '%[1]s'", schema)
},
DropSchema: func(schema string) string { return fmt.Sprintf("DROP SCHEMA %[1]s CASCADE", schema) },
CreateTestTable: func(table string) string {
Expand Down
30 changes: 30 additions & 0 deletions sqlconnect/internal/base/dialect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package base

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

func TestDialect(t *testing.T) {
var d dialect
t.Run("format table", func(t *testing.T) {
formatted := d.FormatTableName("TaBle")
require.Equal(t, "table", formatted, "table name should be lowercased")
})

t.Run("quote identifier", func(t *testing.T) {
quoted := d.QuoteIdentifier("column")
require.Equal(t, `"column"`, quoted, "column name should be quoted with double quotes")
})

t.Run("quote table", func(t *testing.T) {
quoted := d.QuoteTable(sqlconnect.NewRelationRef("table"))
require.Equal(t, `"table"`, quoted, "table name should be quoted with double quotes")

quoted = d.QuoteTable(sqlconnect.NewRelationRef("table", sqlconnect.WithSchema("schema")))
require.Equal(t, `"schema"."table"`, quoted, "schema and table name should be quoted with double quotes")
})
}
9 changes: 7 additions & 2 deletions sqlconnect/internal/base/schemaadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ func (db *DB) ListSchemas(ctx context.Context) ([]sqlconnect.SchemaRef, error) {

// SchemaExists returns true if the schema exists
func (db *DB) SchemaExists(ctx context.Context, schemaRef sqlconnect.SchemaRef) (bool, error) {
var exists bool
if err := db.QueryRowContext(ctx, db.sqlCommands.SchemaExists(schemaRef.Name)).Scan(&exists); err != nil {
rows, err := db.QueryContext(ctx, db.sqlCommands.SchemaExists(schemaRef.Name))
if err != nil {
return false, fmt.Errorf("querying schema exists: %w", err)
}
defer func() { _ = rows.Close() }()
exists := rows.Next()
if err := rows.Err(); err != nil {
return false, fmt.Errorf("iterating schema exists: %w", err)

Check warning on line 78 in sqlconnect/internal/base/schemaadmin.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/base/schemaadmin.go#L78

Added line #L78 was not covered by tests
}
return exists, nil
}

Expand Down
22 changes: 21 additions & 1 deletion sqlconnect/internal/bigquery/db.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package bigquery

import (
"context"
"database/sql"
"encoding/json"
"fmt"

"cloud.google.com/go/bigquery"
"github.com/samber/lo"
"google.golang.org/api/option"

Expand Down Expand Up @@ -49,7 +51,7 @@ func NewDB(configJSON json.RawMessage) (*DB, error) {
}
}
cmds.TableExists = func(schema, table string) string {
return fmt.Sprintf("SELECT EXISTS (SELECT table_name FROM `%[1]s`.INFORMATION_SCHEMA.TABLES WHERE table_name = '%[1]s'", schema, table)
return fmt.Sprintf("SELECT EXISTS (SELECT table_name FROM `%[1]s`.INFORMATION_SCHEMA.TABLES WHERE table_name = '%[2]s'", schema, table)

Check warning on line 54 in sqlconnect/internal/bigquery/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/db.go#L54

Added line #L54 was not covered by tests
}

return cmds
Expand All @@ -67,3 +69,21 @@ func init() {
type DB struct {
*base.DB
}

func (db *DB) WithBigqueryClient(ctx context.Context, f func(*bigquery.Client) error) error {
sqlconn, err := db.Conn(ctx)
if err != nil {
return err
}
defer func() { _ = sqlconn.Close() }()
return sqlconn.Raw(func(driverConn any) error {
if c, ok := driverConn.(bqclient); ok {
return f(c.BigqueryClient())
}
return fmt.Errorf("invalid driver connection")

Check warning on line 83 in sqlconnect/internal/bigquery/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/db.go#L83

Added line #L83 was not covered by tests
})
}

type bqclient interface {
BigqueryClient() *bigquery.Client
}
30 changes: 30 additions & 0 deletions sqlconnect/internal/bigquery/dialect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package bigquery

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

func TestDialect(t *testing.T) {
var d dialect
t.Run("format table", func(t *testing.T) {
formatted := d.FormatTableName("TaBle")
require.Equal(t, "table", formatted, "table name should be lowercased")
})

t.Run("quote identifier", func(t *testing.T) {
quoted := d.QuoteIdentifier("column")
require.Equal(t, "`column`", quoted, "column name should be quoted with backticks")
})

t.Run("quote table", func(t *testing.T) {
quoted := d.QuoteTable(sqlconnect.NewRelationRef("table"))
require.Equal(t, "`table`", quoted, "table name should be quoted with backticks")

quoted = d.QuoteTable(sqlconnect.NewRelationRef("table", sqlconnect.WithSchema("schema")))
require.Equal(t, "`schema.table`", quoted, "schema and table name should be quoted with backticks")
})
}
18 changes: 18 additions & 0 deletions sqlconnect/internal/bigquery/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package bigquery_test

import (
"os"
"strings"
"testing"

"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/bigquery"
integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test"
)

func TestBigqueryDB(t *testing.T) {
configJSON, ok := os.LookupEnv("BIGQUERY_TEST_ENVIRONMENT_CREDENTIALS")
if !ok {
t.Skip("skipping bigquery integration test due to lack of a test environment")
}
integrationtest.TestDatabaseScenarios(t, bigquery.DatabaseType, []byte(configJSON), strings.ToLower)
}
57 changes: 57 additions & 0 deletions sqlconnect/internal/bigquery/schemaadmin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package bigquery

import (
"context"
"errors"

"cloud.google.com/go/bigquery"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

// SchemaExists uses the bigquery client instead of [INFORMATION_SCHEMA.SCHEMATA] due to absence of a region qualifier
// https://cloud.google.com/bigquery/docs/information-schema-datasets-schemata#scope_and_syntax
func (db *DB) SchemaExists(ctx context.Context, schemaRef sqlconnect.SchemaRef) (bool, error) {
var exists bool
if err := db.WithBigqueryClient(ctx, func(c *bigquery.Client) error {
if _, err := c.Dataset(schemaRef.Name).Metadata(ctx); err != nil {
var e *googleapi.Error
if ok := errors.As(err, &e); ok {
if e.Code == 404 { // not found
return nil
}
}
return err

Check warning on line 26 in sqlconnect/internal/bigquery/schemaadmin.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/schemaadmin.go#L26

Added line #L26 was not covered by tests
}
exists = true
return nil
}); err != nil {
return false, err
}
return exists, nil
}

// ListSchemas uses the bigquery client instead of [INFORMATION_SCHEMA.SCHEMATA] due to absence of a region qualifier
// https://cloud.google.com/bigquery/docs/information-schema-datasets-schemata#scope_and_syntax
func (db *DB) ListSchemas(ctx context.Context) ([]sqlconnect.SchemaRef, error) {
var schemas []sqlconnect.SchemaRef
if err := db.WithBigqueryClient(ctx, func(c *bigquery.Client) error {
datasets := c.Datasets(ctx)
for {
var dataset *bigquery.Dataset
dataset, err := datasets.Next()
if err != nil {
if err == iterator.Done {
return nil
}
return err

Check warning on line 49 in sqlconnect/internal/bigquery/schemaadmin.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/schemaadmin.go#L49

Added line #L49 was not covered by tests
}
schemas = append(schemas, sqlconnect.SchemaRef{Name: dataset.DatasetID})
}
}); err != nil {
return nil, err
}
return schemas, nil
}
1 change: 1 addition & 0 deletions sqlconnect/internal/databricks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
RetryAttempts int `json:"retryAttempts"`
MinRetryWaitTime time.Duration `json:"minRetryWaitTime"`
MaxRetryWaitTime time.Duration `json:"maxRetryWaitTime"`
MaxConnIdleTime time.Duration `json:"maxConnIdleTime"`

// RudderSchema is used to override the default rudder schema name during tests
RudderSchema string `json:"rudderSchema"`
Expand Down
3 changes: 3 additions & 0 deletions sqlconnect/internal/databricks/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package databricks
import (
"database/sql"
"encoding/json"
"fmt"

databricks "github.com/databricks/databricks-sql-go"
"github.com/samber/lo"
Expand Down Expand Up @@ -45,6 +46,7 @@ func NewDB(configJson json.RawMessage) (*DB, error) {
if err != nil {
return nil, err

Check warning on line 47 in sqlconnect/internal/databricks/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/databricks/db.go#L47

Added line #L47 was not covered by tests
}
db.SetConnMaxIdleTime(config.MaxConnIdleTime)

return &DB{
DB: base.NewDB(
Expand All @@ -55,6 +57,7 @@ func NewDB(configJson json.RawMessage) (*DB, error) {
base.WithJsonRowMapper(jsonRowMapper),
base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands {
cmds.ListSchemas = func() (string, string) { return "SHOW SCHEMAS", "schema_name" }
cmds.SchemaExists = func(schema string) string { return fmt.Sprintf(`SHOW SCHEMAS LIKE '%s'`, schema) }
cmds.ListTables = func(schema string) []lo.Tuple2[string, string] {
return []lo.Tuple2[string, string]{
{A: "SHOW TABLES IN " + schema, B: "tableName"},

Check warning on line 63 in sqlconnect/internal/databricks/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/databricks/db.go#L62-L63

Added lines #L62 - L63 were not covered by tests
Expand Down
Loading

0 comments on commit c8217cf

Please sign in to comment.