Skip to content

Commit

Permalink
wip: streamed statements
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Dec 3, 2024
1 parent 82da1f9 commit d6423af
Show file tree
Hide file tree
Showing 2 changed files with 331 additions and 17 deletions.
153 changes: 153 additions & 0 deletions database/delete.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
package database

import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/backoff"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/retry"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"reflect"
"time"
)

type DeleteStatement interface {
From(table string) DeleteStatement

Expand All @@ -10,6 +24,8 @@ type DeleteStatement interface {
Table() string

Where() string

apply(do *deleteOptions)
}

func NewDeleteStatement(entity Entity) DeleteStatement {
Expand Down Expand Up @@ -47,3 +63,140 @@ func (d *deleteStatement) Table() string {
func (d *deleteStatement) Where() string {
return d.where
}

func (d *deleteStatement) apply(opts *deleteOptions) {
opts.stmt = d
}

type DeleteOption interface {
apply(*deleteOptions)
}

type DeleteOptionFunc func(opts *deleteOptions)

func (f DeleteOptionFunc) apply(opts *deleteOptions) {
f(opts)
}

func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
return DeleteOptionFunc(func(opts *deleteOptions) {
opts.onDelete = onDelete
})
}

type deleteOptions struct {
stmt DeleteStatement
onDelete []OnSuccess[any]
}

func DeleteStreamed(
ctx context.Context,
db *DB,
entityType Entity,
entities <-chan any,
options ...DeleteOption,
) error {
opts := &deleteOptions{}
for _, option := range options {
option.apply(opts)
}

first, forward, err := com.CopyFirst(ctx, entities)
if err != nil {
return errors.Wrap(err, "can't copy first entity")
}

sem := db.GetSemaphoreForTable(TableName(entityType))

var stmt string

if opts.stmt != nil {
stmt, _ = BuildDeleteStatement(db, opts.stmt)
} else {
stmt = db.BuildDeleteStmt(entityType)
}

switch reflect.TypeOf(first).Kind() {
case reflect.Struct, reflect.Map:
return namedBulkExec(ctx, db, stmt, db.Options.MaxPlaceholdersPerStatement, sem, forward, com.NeverSplit[any], opts.onDelete...)
default:
return bulkExec(ctx, db, stmt, db.Options.MaxPlaceholdersPerStatement, sem, forward, opts.onDelete...)
}
}

func bulkExec(
ctx context.Context, db *DB, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any],
) error {
var counter com.Counter
defer db.Log(ctx, query, &counter).Stop()

g, ctx := errgroup.WithContext(ctx)
// Use context from group.
bulk := com.Bulk(ctx, arg, count, com.NeverSplit[any])

g.Go(func() error {
g, ctx := errgroup.WithContext(ctx)

for b := range bulk {
if err := sem.Acquire(ctx, 1); err != nil {
return errors.Wrap(err, "can't acquire semaphore")
}

g.Go(func(b []any) func() error {
return func() error {
defer sem.Release(1)

return retry.WithBackoff(
ctx,
func(context.Context) error {
var valCollection []any

for _, v := range b {
val := reflect.ValueOf(v)
if val.Kind() == reflect.Slice {
for i := 0; i < val.Len(); i++ {
valCollection = append(valCollection, val.Index(i).Interface())
}
} else {
valCollection = append(valCollection, val.Interface())
}
}

stmt, args, err := sqlx.In(query, valCollection)
if err != nil {
return fmt.Errorf(
"%w: %w",
retry.ErrNotRetryable,
errors.Wrapf(err, "can't build placeholders for %q", query),
)
}

stmt = db.Rebind(stmt)
_, err = db.ExecContext(ctx, stmt, args...)
if err != nil {
return CantPerformQuery(err, query)
}

counter.Add(uint64(len(b)))

for _, onSuccess := range onSuccess {
if err := onSuccess(ctx, b); err != nil {
return err
}
}

return nil
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
db.GetDefaultRetrySettings(),
)
}
}(b))
}

return g.Wait()
})

return g.Wait()
}
195 changes: 178 additions & 17 deletions database/upsert.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package database

import (
"context"
"github.com/icinga/icinga-go-library/backoff"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/retry"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"time"
)

type UpsertStatement interface {
Into(table string) UpsertStatement

Expand All @@ -14,6 +25,8 @@ type UpsertStatement interface {
Columns() []string

ExcludedColumns() []string

apply(opts *upsertOptions)
}

func NewUpsertStatement(entity Entity) UpsertStatement {
Expand All @@ -29,36 +42,184 @@ type upsertStatement struct {
excludedColumns []string
}

func (i *upsertStatement) Into(table string) UpsertStatement {
i.table = table
func (u *upsertStatement) Into(table string) UpsertStatement {
u.table = table

return u
}

func (u *upsertStatement) SetColumns(columns ...string) UpsertStatement {
u.columns = columns

return u
}

func (u *upsertStatement) SetExcludedColumns(columns ...string) UpsertStatement {
u.excludedColumns = columns

return u
}

func (u *upsertStatement) Entity() Entity {
return u.entity
}

func (u *upsertStatement) Table() string {
return u.table
}

func (u *upsertStatement) Columns() []string {
return u.columns
}

return i
func (u *upsertStatement) ExcludedColumns() []string {
return u.excludedColumns
}

func (i *upsertStatement) SetColumns(columns ...string) UpsertStatement {
i.columns = columns
func (u *upsertStatement) apply(opts *upsertOptions) {
opts.stmt = u
}

return i
type UpsertOption interface {
apply(opts *upsertOptions)
}

func (i *upsertStatement) SetExcludedColumns(columns ...string) UpsertStatement {
i.excludedColumns = columns
type UpsertOptionFunc func(opts *upsertOptions)

return i
func (f UpsertOptionFunc) apply(opts *upsertOptions) {
f(opts)
}

func (i *upsertStatement) Entity() Entity {
return i.entity
func WithOnUpsert(onUpsert ...OnSuccess[any]) UpsertOption {
return UpsertOptionFunc(func(opts *upsertOptions) {
opts.onUpsert = onUpsert
})
}

func (i *upsertStatement) Table() string {
return i.table
type upsertOptions struct {
stmt UpsertStatement
onUpsert []OnSuccess[any]
}

func (i *upsertStatement) Columns() []string {
return i.columns
func UpsertStreamed[T any, V EntityConstraint[T]](
ctx context.Context,
db *DB,
entities <-chan T,
options ...UpsertOption,
) error {
opts := &upsertOptions{}
for _, option := range options {
option.apply(opts)
}

entityType := V(new(T))
sem := db.GetSemaphoreForTable(TableName(entityType))

var stmt string
var placeholders int

if opts.stmt != nil {
stmt, placeholders, _ = BuildUpsertStatement(db, opts.stmt)
} else {
stmt, placeholders = db.BuildUpsertStmt(entityType)
}

return namedBulkExec[T](
ctx, db, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
entities, splitOnDupId[T], opts.onUpsert...,
)
}

func (i *upsertStatement) ExcludedColumns() []string {
return i.excludedColumns
func namedBulkExec[T any](
ctx context.Context,
db *DB,
query string,
count int,
sem *semaphore.Weighted,
arg <-chan T,
splitPolicyFactory com.BulkChunkSplitPolicyFactory[T],
onSuccess ...OnSuccess[any],
) error {
var counter com.Counter
defer db.Log(ctx, query, &counter).Stop()

g, ctx := errgroup.WithContext(ctx)
bulk := com.Bulk(ctx, arg, count, splitPolicyFactory)

g.Go(func() error {
for {
select {
case b, ok := <-bulk:
if !ok {
return nil
}

if err := sem.Acquire(ctx, 1); err != nil {
return errors.Wrap(err, "can't acquire semaphore")
}

g.Go(func(b []T) func() error {
return func() error {
defer sem.Release(1)

return retry.WithBackoff(
ctx,
func(ctx context.Context) error {
_, err := db.NamedExecContext(ctx, query, b)
if err != nil {
return CantPerformQuery(err, query)
}

counter.Add(uint64(len(b)))

for _, onSuccess := range onSuccess {
// TODO (jr): remove -> workaround vvvv
var items []any
for _, item := range b {
items = append(items, any(item))
}
// TODO ---- workaround end ---- ^^^^

if err := onSuccess(ctx, items); err != nil {
return err
}
}

return nil
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
db.GetDefaultRetrySettings(),
)
}
}(b))
case <-ctx.Done():
return ctx.Err()
}
}
})

return g.Wait()
}

func splitOnDupId[T any]() com.BulkChunkSplitPolicy[T] {
seenIds := map[string]struct{}{}

return func(ider T) bool {
entity, ok := any(ider).(IDer)
if !ok {
panic("Type T does not implement IDer")
}

id := entity.ID().String()

_, ok = seenIds[id]
if ok {
seenIds = map[string]struct{}{id: {}}
} else {
seenIds[id] = struct{}{}
}

return ok
}
}

0 comments on commit d6423af

Please sign in to comment.