Skip to content

Commit

Permalink
Extend functional query builders
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Nov 25, 2024
1 parent 2a133da commit 279942d
Showing 1 changed file with 201 additions and 92 deletions.
293 changes: 201 additions & 92 deletions database/optionally.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,134 +2,243 @@ package database

import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/com"
"github.com/pkg/errors"
)

// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists.
type Upsert interface {
// Stream bulk upserts the specified entities via NamedBulkExec.
// If not explicitly specified, the upsert statement is created using
// BuildUpsertStmt with the first entity from the entities stream.
Stream(ctx context.Context, entities <-chan Entity) error
}
// QueryType represents the type of database query, expressed as an enum-like integer value.
type QueryType int

// UpsertOption is a functional option for NewUpsert.
type UpsertOption func(u *upsert)
const (
// SelectQuery represents a SQL SELECT query type, used for retrieving data from a database.
SelectQuery QueryType = iota

// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the
// operation was performed successfully are passed to the callbacks.
func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption {
return func(u *upsert) {
u.onUpsert = onUpsert
}
// InsertQuery represents the constant value for an INSERT database query.
InsertQuery

// UpsertQuery represents the constant value used for an UPSERT (INSERT or UPDATE) database query.
UpsertQuery

// UpdateQuery represents the constant value for an UPDATE database query.
UpdateQuery

// DeleteQuery represents the constant value for a DELETE query.
DeleteQuery
)

// Queryable defines methods for bulk executing database entities such as upsert, insert, and update.
type Queryable interface {
// Stream bulk executes database Entity(ies) for the following three database query types.
// * Upsert - Stream consumes from the provided entities channel and bulk upserts them via DB.NamedBulkExec.
// If not explicitly specified via WithStatement, the upsert statement is generated dynamically via the
// QueryBuilder. The bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency
// via the Options.MaxConnectionsPerTable.
// * Insert(Ignore) - Stream does likewise for insert statement and bulk inserts the entities via DB.NamedBulkExec.
// If not explicitly specified via WithStatement, the insert statement is generated dynamically via the
// QueryBuilder. The bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency
// via the Options.MaxConnectionsPerTable.
// * Update - Stream bulk updates the entities via DB.NamedBulkExecTx. If not explicitly specified via
// WithStatement, the update statement is generated dynamically via the QueryBuilder. The bulk size is
// controlled via Options.MaxRowsPerTransaction and concurrency via the Options.MaxConnectionsPerTable.
// Entities for which the query ran successfully will be passed to the onSuccess handlers (if provided).
Stream(ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity]) error

// StreamAny bulk executes the streamed items of type any using the [DB.BulkExec] method.
StreamAny(ctx context.Context, args <-chan any, onSuccess ...OnSuccess[any]) error
}

// WithStatement uses the specified statement for bulk upserts instead of automatically creating one.
func WithStatement(stmt string, placeholders int) UpsertOption {
return func(u *upsert) {
u.stmt = stmt
u.placeholders = placeholders
}
// NewSelect initializes a new Queryable object of type SelectQuery for a given [DB], subject.
func NewSelect(db *DB, subject any, options ...QueryableOption) Queryable {
return newQuery(db, subject, append([]QueryableOption{withSetQueryType(SelectQuery)}, options...)...)
}

// NewUpsert creates a new Upsert initalized with a database.
func NewUpsert(db *DB, options ...UpsertOption) Upsert {
u := &upsert{db: db}
// NewInsert initializes a new Queryable object of type InsertQuery for a given [DB], subject.
func NewInsert(db *DB, subject any, options ...QueryableOption) Queryable {
return newQuery(db, subject, append([]QueryableOption{withSetQueryType(InsertQuery)}, options...)...)
}

for _, option := range options {
option(u)
}
// NewUpsert initializes a new Queryable object of type UpsertQuery for a given [DB], subject.
func NewUpsert(db *DB, subject any, options ...QueryableOption) Queryable {
return newQuery(db, subject, append([]QueryableOption{withSetQueryType(UpsertQuery)}, options...)...)
}

return u
// NewUpdate initializes a new Queryable object of type UpdateQuery for a given [DB], subject.
func NewUpdate(db *DB, subject any, options ...QueryableOption) Queryable {
return newQuery(db, subject, append([]QueryableOption{withSetQueryType(UpdateQuery)}, options...)...)
}

type upsert struct {
db *DB
onUpsert []OnSuccess[Entity]
stmt string
placeholders int
// NewDelete initializes a new Queryable object of type DeleteQuery for a given [DB], subject.
func NewDelete(db *DB, subject any, options ...QueryableOption) Queryable {
return newQuery(db, subject, append([]QueryableOption{withSetQueryType(DeleteQuery)}, options...)...)
}

func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if err != nil {
return errors.Wrap(err, "can't copy first entity")
}
// queryable represents a database query type with customizable behavior for dynamic and static SQL statements.
type queryable struct {
db *DB

sem := u.db.GetSemaphoreForTable(TableName(first))
var stmt string
var placeholders int
// qb is the query builder used to construct SQL statements for various database
// statements if, and only if stmt is not set.
qb *QueryBuilder

if u.stmt != "" {
stmt = u.stmt
placeholders = u.placeholders
} else {
stmt, placeholders = u.db.BuildUpsertStmt(first)
}
// qtype defines the type of database query (e.g., SELECT, INSERT) to perform, influencing query construction behavior.
qtype QueryType

// scoper is used to dynamically generate scoped database queries if, and only if stmt is not set.
scoper any

// stmt is used to cache statically provided database statements.
stmt string

return u.db.NamedBulkExec(
ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem,
forward, SplitOnDupId[Entity], u.onUpsert...,
)
// placeholders is used to determine the entities bulk/chunk size for statically provided statements.
placeholders int

// ignoreOnError is only used to generate special insert statements that silently suppress duplicate key errors.
ignoreOnError bool
}

// Delete deletes rows of a table.
type Delete interface {
// Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec.
// Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt.
Stream(ctx context.Context, from any, args <-chan any) error
// Assert that *queryable type satisfies the Queryable interface.
var _ Queryable = (*queryable)(nil)

// Stream implements the [Queryable.Stream] method.
func (q *queryable) Stream(ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity]) error {
sem := q.db.GetSemaphoreForTable(TableName(q.qb.subject))
stmt, placeholders := q.buildStmt()
batchSize := q.db.BatchSizeByPlaceholders(placeholders)

switch q.qtype {
case SelectQuery: // TODO: support select statements?
case InsertQuery:
return q.db.NamedBulkExec(ctx, stmt, batchSize, sem, entities, com.NeverSplit[Entity], onSuccess...)
case UpsertQuery:
return q.db.NamedBulkExec(ctx, stmt, batchSize, sem, entities, SplitOnDupId[Entity], onSuccess...)
case UpdateQuery:
return q.db.NamedBulkExecTx(ctx, stmt, q.db.Options.MaxRowsPerTransaction, sem, entities)
case DeleteQuery:
return errors.Errorf("can't stream entities for 'DELETE' query")
}

return errors.Errorf("unsupported query type: %v", q.qtype)
}

// DeleteOption is a functional option for NewDelete.
type DeleteOption func(options *delete)
// StreamAny implements the [Queryable.StreamAny] method.
func (q *queryable) StreamAny(ctx context.Context, args <-chan any, onSuccess ...OnSuccess[any]) error {
stmt, _ := q.buildStmt()
sem := q.db.GetSemaphoreForTable(TableName(q.qb.subject))

// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the
// operation was performed successfully are passed to the callbacks.
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
return func(d *delete) {
d.onDelete = onDelete
}
return q.db.BulkExec(ctx, stmt, q.db.Options.MaxPlaceholdersPerStatement, sem, args, onSuccess...)
}

// ByColumn uses the given column for the WHERE clause that the rows must
// satisfy in order to be deleted, instead of automatically using ID.
func ByColumn(column string) DeleteOption {
return func(d *delete) {
d.column = column
// buildStmt constructs the SQL statement based on the type of query (Select, Insert, Upsert, Update, Delete).
// It also determines the number of placeholders to be used in the statement.
func (q *queryable) buildStmt() (string, int) {
if q.stmt != "" {
return q.stmt, q.placeholders
}
}

// NewDelete creates a new Delete initalized with a database.
func NewDelete(db *DB, options ...DeleteOption) Delete {
d := &delete{db: db}
var stmt string
var placeholders int

for _, option := range options {
option(d)
switch q.qtype {
case SelectQuery: // TODO: support select statements?
case InsertQuery:
if q.ignoreOnError {
stmt, placeholders = q.qb.InsertIgnore(q.db)
} else {
stmt, placeholders = q.qb.Insert(q.db)
}
case UpsertQuery:
if q.stmt != "" {
stmt, placeholders = q.stmt, q.placeholders
} else {
stmt, placeholders = q.qb.Upsert(q.db)
}
case UpdateQuery:
stmt = q.stmt
if stmt == "" {
if q.scoper != nil && q.scoper.(string) != "" {
stmt, _ = q.qb.UpdateScoped(q.db, q.scoper)
} else {
stmt, _ = q.qb.Update(q.db)
}
}
case DeleteQuery:
if q.stmt != "" {
stmt, placeholders = q.stmt, q.placeholders
} else if q.scoper != "" {
stmt = q.qb.DeleteBy(q.scoper.(string))
} else {
stmt = q.qb.Delete()
}
}

return d
return stmt, placeholders
}

type delete struct {
db *DB
column string
onDelete []OnSuccess[any]
}
// newQuery initializes a new Queryable object for a given [DB], subject, and query type.
// It also applies optional query options to the just created queryable object.
//
// Note: If the query type is not explicitly set using WithSetQueryType, it will default to SELECT queries.
func newQuery(db *DB, subject any, options ...QueryableOption) Queryable {
q := &queryable{db: db, qb: &QueryBuilder{subject: subject}}
for _, option := range options {
option(q)
}

func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error {
var stmt string
return q
}

if d.column != "" {
stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column)
} else {
stmt = d.db.BuildDeleteStmt(from)
// QueryableOption describes the base functional specification for all the queryable types.
type QueryableOption func(*queryable)

// withSetQueryType sets the type of database query to be executed/generated.
func withSetQueryType(qtype QueryType) QueryableOption { return func(q *queryable) { q.qtype = qtype } }

// WithStatement configures a static SQL statement and its associated placeholders for a queryable entity.
//
// Note that using WithStatement always suppresses all other available queryable options and unlike
// some other options, this can be used to explicitly provide a custom query for all kinds of DB stmts.
//
// Returns a function that lazily modifies a given queryable type by setting its stmt and placeholders fields.
func WithStatement(stmt string, placeholders int) QueryableOption {
return func(q *queryable) {
q.stmt = stmt
q.placeholders = placeholders
}
}

// WithColumns statically configures the DB columns to be used for building the database statements.
//
// Setting the queryable columns while using WithStatement has no behavioural effects, thus these columns are never
// used. Additionally, for upsert statements, WithColumns not only defines the columns to be actually inserted but
// the columns to be updated when a duplicate key error occurs as well. However, to maintain the compatibility with
// legacy implementations, a query subject that implements the Upserter interface takes a higher precedence over
// those explicitly set columns for the "update on duplicate key error" part.
//
// Note that using this option for Delete statements has no effect as well, hence its usage is discouraged.
//
// Returns a function that lazily modifies a given queryable type by setting its columns.
func WithColumns(columns ...string) QueryableOption {
return func(q *queryable) { q.qb.SetColumns(columns...) }
}

sem := d.db.GetSemaphoreForTable(TableName(from))
// WithoutColumns returns a QueryableOption callback that excludes the DB columns from the generated DB statements.
//
// Setting the excludable columns while using WithStatement has no behavioural effects, so these columns may or may
// not be excluded depending on the explicitly set statement. Also, note that using this option for Delete statements
// has no effect as well, hence its usage is prohibited.
func WithoutColumns(columns ...string) QueryableOption {
return func(q *queryable) { q.qb.SetExcludedColumns(columns...) }
}

return d.db.BulkExec(
ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete...,
)
// WithIgnoreOnError returns a InsertOption callback that sets the ignoreOnError flag DB insert statements.
//
// When this flag is set, the dynamically generated insert statement will cause to suppress all duplicate key errors.
//
// Setting this flag while using WithStatement has no behavioural effects, so the final database statement
// may or may not silently suppress "duplicate key errors" depending on the explicitly set statement.
func WithIgnoreOnError() QueryableOption { return func(q *queryable) { q.ignoreOnError = true } }

// WithByColumn returns a functional option for DeleteOption or UpdateOption, setting the scoper to the provided column.
func WithByColumn(column string) QueryableOption {
return func(q *queryable) { q.scoper = column }
}

0 comments on commit 279942d

Please sign in to comment.