Skip to content

Commit

Permalink
Structs for upsert and delete
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed May 24, 2024
1 parent 48af1a6 commit 7048f8f
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 89 deletions.
95 changes: 6 additions & 89 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,67 +691,25 @@ func (db *DB) CreateIgnoreStreamed(
)
}

func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
return func(options *ExecOptions) {
options.onSuccess = onSuccess
}
}

func WithStatement(stmt string, placeholders int) ExecOption {
return func(options *ExecOptions) {
options.stmt = stmt
options.placeholders = placeholders
}
}

type ExecOption func(options *ExecOptions)

type ExecOptions struct {
onSuccess []OnSuccess[Entity]
stmt string
placeholders int
}

func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
execOptions := &ExecOptions{}

for _, option := range execOpts {
option(execOptions)
}

return execOptions
}

// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) UpsertStreamed(
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
) error {

execOptions := NewExecOptions(execOpts...)

first, forward, err := com.CopyFirst(ctx, entities)
if err != nil {
return errors.Wrap(err, "can't copy first entity")
}

sem := db.GetSemaphoreForTable(TableName(first))
var stmt string
var placeholders int

if execOptions.stmt != "" {
stmt = execOptions.stmt
placeholders = execOptions.placeholders
} else {
stmt, placeholders = db.BuildUpsertStmt(first)
}
stmt, placeholders := db.BuildUpsertStmt(first)

return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
forward, SplitOnDupId[Entity], onSuccess...,
)
}

Expand All @@ -770,58 +728,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
}

func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
return func(options *DeleteOptions) {
options.onSuccess = onSuccess
}
}

func ByColumn(column string) DeleteOption {
return func(options *DeleteOptions) {
options.column = column
}
}

type DeleteOption func(options *DeleteOptions)

type DeleteOptions struct {
onSuccess []OnSuccess[any]
column string
}

func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
deleteOptions := &DeleteOptions{}

for _, option := range execOpts {
option(deleteOptions)
}

return deleteOptions
}

// DeleteStreamed bulk deletes the specified ids via BulkExec.
// The delete statement is created using BuildDeleteStmt with the passed entityType.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// IDs for which the query ran successfully will be passed to onSuccess.
func (db *DB) DeleteStreamed(
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
) error {

deleteOptions := NewDeleteOptions(deleteOpts...)

sem := db.GetSemaphoreForTable(TableName(entityType))

var stmt string

if deleteOptions.column != "" {
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
} else {
stmt = db.BuildDeleteStmt(entityType)
}

return db.BulkExec(
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
)
}

Expand All @@ -837,7 +754,7 @@ func (db *DB) Delete(
}
close(idsCh)

return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
}

func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
Expand Down
135 changes: 135 additions & 0 deletions database/optionally.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package database

import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/com"
"github.com/pkg/errors"
)

// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists.
type Upsert interface {
// Stream bulk upserts the specified entities via NamedBulkExec.
// If not explicitly specified, the upsert statement is created using
// BuildUpsertStmt with the first entity from the entities stream.
Stream(ctx context.Context, entities <-chan Entity) error
}

// UpsertOption is a functional option for NewUpsert.
type UpsertOption func(u *upsert)

// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the
// operation was performed successfully are passed to the callbacks.
func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption {
return func(u *upsert) {
u.onUpsert = onUpsert
}
}

// WithStatement uses the specified statement for bulk upserts instead of automatically creating one.
func WithStatement(stmt string, placeholders int) UpsertOption {
return func(u *upsert) {
u.stmt = stmt
u.placeholders = placeholders
}
}

// NewUpsert creates a new Upsert initalized with a database.
func NewUpsert(db *DB, options ...UpsertOption) Upsert {
u := &upsert{db: db}

for _, option := range options {
option(u)
}

return u
}

type upsert struct {
db *DB
onUpsert []OnSuccess[Entity]
stmt string
placeholders int
}

func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if err != nil {
return errors.Wrap(err, "can't copy first entity")
}

sem := u.db.GetSemaphoreForTable(TableName(first))
var stmt string
var placeholders int

if u.stmt != "" {
stmt = u.stmt
placeholders = u.placeholders
} else {
stmt, placeholders = u.db.BuildUpsertStmt(first)
}

return u.db.NamedBulkExec(
ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem,
forward, SplitOnDupId[Entity], u.onUpsert...,
)
}

// Delete deletes rows of a table.
type Delete interface {
// Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec.
// Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt.
Stream(ctx context.Context, from any, args <-chan any) error
}

// DeleteOption is a functional option for NewDelete.
type DeleteOption func(options *delete)

// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the
// operation was performed successfully are passed to the callbacks.
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
return func(d *delete) {
d.onDelete = onDelete
}
}

// ByColumn uses the given column for the WHERE clause that the rows must
// satisfy in order to be deleted, instead of automatically using ID.
func ByColumn(column string) DeleteOption {
return func(d *delete) {
d.column = column
}
}

// NewDelete creates a new Delete initalized with a database.
func NewDelete(db *DB, options ...DeleteOption) Delete {
d := &delete{db: db}

for _, option := range options {
option(d)
}

return d
}

type delete struct {
db *DB
column string
onDelete []OnSuccess[any]
}

func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error {
var stmt string

if d.column != "" {
stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column)
} else {
stmt = d.db.BuildDeleteStmt(from)
}

sem := d.db.GetSemaphoreForTable(TableName(from))

return d.db.BulkExec(
ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete...,
)
}

0 comments on commit 7048f8f

Please sign in to comment.