From d6423af690cd7336ae5f6aaae51510191390350f Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 3 Dec 2024 15:06:44 +0100 Subject: [PATCH] wip: streamed statements --- database/delete.go | 153 +++++++++++++++++++++++++++++++++++ database/upsert.go | 195 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 331 insertions(+), 17 deletions(-) diff --git a/database/delete.go b/database/delete.go index 7534354..286bcb1 100644 --- a/database/delete.go +++ b/database/delete.go @@ -1,5 +1,19 @@ package database +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/com" + "github.com/icinga/icinga-go-library/retry" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + "reflect" + "time" +) + type DeleteStatement interface { From(table string) DeleteStatement @@ -10,6 +24,8 @@ type DeleteStatement interface { Table() string Where() string + + apply(do *deleteOptions) } func NewDeleteStatement(entity Entity) DeleteStatement { @@ -47,3 +63,140 @@ func (d *deleteStatement) Table() string { func (d *deleteStatement) Where() string { return d.where } + +func (d *deleteStatement) apply(opts *deleteOptions) { + opts.stmt = d +} + +type DeleteOption interface { + apply(*deleteOptions) +} + +type DeleteOptionFunc func(opts *deleteOptions) + +func (f DeleteOptionFunc) apply(opts *deleteOptions) { + f(opts) +} + +func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption { + return DeleteOptionFunc(func(opts *deleteOptions) { + opts.onDelete = onDelete + }) +} + +type deleteOptions struct { + stmt DeleteStatement + onDelete []OnSuccess[any] +} + +func DeleteStreamed( + ctx context.Context, + db *DB, + entityType Entity, + entities <-chan any, + options ...DeleteOption, +) error { + opts := &deleteOptions{} + for _, option := range options { + option.apply(opts) + } + + first, forward, err := com.CopyFirst(ctx, entities) + if err != nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := db.GetSemaphoreForTable(TableName(entityType)) + + var stmt string + + if opts.stmt != nil { + stmt, _ = BuildDeleteStatement(db, opts.stmt) + } else { + stmt = db.BuildDeleteStmt(entityType) + } + + switch reflect.TypeOf(first).Kind() { + case reflect.Struct, reflect.Map: + return namedBulkExec(ctx, db, stmt, db.Options.MaxPlaceholdersPerStatement, sem, forward, com.NeverSplit[any], opts.onDelete...) + default: + return bulkExec(ctx, db, stmt, db.Options.MaxPlaceholdersPerStatement, sem, forward, opts.onDelete...) + } +} + +func bulkExec( + ctx context.Context, db *DB, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any], +) error { + var counter com.Counter + defer db.Log(ctx, query, &counter).Stop() + + g, ctx := errgroup.WithContext(ctx) + // Use context from group. + bulk := com.Bulk(ctx, arg, count, com.NeverSplit[any]) + + g.Go(func() error { + g, ctx := errgroup.WithContext(ctx) + + for b := range bulk { + if err := sem.Acquire(ctx, 1); err != nil { + return errors.Wrap(err, "can't acquire semaphore") + } + + g.Go(func(b []any) func() error { + return func() error { + defer sem.Release(1) + + return retry.WithBackoff( + ctx, + func(context.Context) error { + var valCollection []any + + for _, v := range b { + val := reflect.ValueOf(v) + if val.Kind() == reflect.Slice { + for i := 0; i < val.Len(); i++ { + valCollection = append(valCollection, val.Index(i).Interface()) + } + } else { + valCollection = append(valCollection, val.Interface()) + } + } + + stmt, args, err := sqlx.In(query, valCollection) + if err != nil { + return fmt.Errorf( + "%w: %w", + retry.ErrNotRetryable, + errors.Wrapf(err, "can't build placeholders for %q", query), + ) + } + + stmt = db.Rebind(stmt) + _, err = db.ExecContext(ctx, stmt, args...) + if err != nil { + return CantPerformQuery(err, query) + } + + counter.Add(uint64(len(b))) + + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, b); err != nil { + return err + } + } + + return nil + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + db.GetDefaultRetrySettings(), + ) + } + }(b)) + } + + return g.Wait() + }) + + return g.Wait() +} diff --git a/database/upsert.go b/database/upsert.go index 290617f..f1600a5 100644 --- a/database/upsert.go +++ b/database/upsert.go @@ -1,5 +1,16 @@ package database +import ( + "context" + "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/com" + "github.com/icinga/icinga-go-library/retry" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + "time" +) + type UpsertStatement interface { Into(table string) UpsertStatement @@ -14,6 +25,8 @@ type UpsertStatement interface { Columns() []string ExcludedColumns() []string + + apply(opts *upsertOptions) } func NewUpsertStatement(entity Entity) UpsertStatement { @@ -29,36 +42,184 @@ type upsertStatement struct { excludedColumns []string } -func (i *upsertStatement) Into(table string) UpsertStatement { - i.table = table +func (u *upsertStatement) Into(table string) UpsertStatement { + u.table = table + + return u +} + +func (u *upsertStatement) SetColumns(columns ...string) UpsertStatement { + u.columns = columns + + return u +} + +func (u *upsertStatement) SetExcludedColumns(columns ...string) UpsertStatement { + u.excludedColumns = columns + + return u +} + +func (u *upsertStatement) Entity() Entity { + return u.entity +} + +func (u *upsertStatement) Table() string { + return u.table +} + +func (u *upsertStatement) Columns() []string { + return u.columns +} - return i +func (u *upsertStatement) ExcludedColumns() []string { + return u.excludedColumns } -func (i *upsertStatement) SetColumns(columns ...string) UpsertStatement { - i.columns = columns +func (u *upsertStatement) apply(opts *upsertOptions) { + opts.stmt = u +} - return i +type UpsertOption interface { + apply(opts *upsertOptions) } -func (i *upsertStatement) SetExcludedColumns(columns ...string) UpsertStatement { - i.excludedColumns = columns +type UpsertOptionFunc func(opts *upsertOptions) - return i +func (f UpsertOptionFunc) apply(opts *upsertOptions) { + f(opts) } -func (i *upsertStatement) Entity() Entity { - return i.entity +func WithOnUpsert(onUpsert ...OnSuccess[any]) UpsertOption { + return UpsertOptionFunc(func(opts *upsertOptions) { + opts.onUpsert = onUpsert + }) } -func (i *upsertStatement) Table() string { - return i.table +type upsertOptions struct { + stmt UpsertStatement + onUpsert []OnSuccess[any] } -func (i *upsertStatement) Columns() []string { - return i.columns +func UpsertStreamed[T any, V EntityConstraint[T]]( + ctx context.Context, + db *DB, + entities <-chan T, + options ...UpsertOption, +) error { + opts := &upsertOptions{} + for _, option := range options { + option.apply(opts) + } + + entityType := V(new(T)) + sem := db.GetSemaphoreForTable(TableName(entityType)) + + var stmt string + var placeholders int + + if opts.stmt != nil { + stmt, placeholders, _ = BuildUpsertStatement(db, opts.stmt) + } else { + stmt, placeholders = db.BuildUpsertStmt(entityType) + } + + return namedBulkExec[T]( + ctx, db, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + entities, splitOnDupId[T], opts.onUpsert..., + ) } -func (i *upsertStatement) ExcludedColumns() []string { - return i.excludedColumns +func namedBulkExec[T any]( + ctx context.Context, + db *DB, + query string, + count int, + sem *semaphore.Weighted, + arg <-chan T, + splitPolicyFactory com.BulkChunkSplitPolicyFactory[T], + onSuccess ...OnSuccess[any], +) error { + var counter com.Counter + defer db.Log(ctx, query, &counter).Stop() + + g, ctx := errgroup.WithContext(ctx) + bulk := com.Bulk(ctx, arg, count, splitPolicyFactory) + + g.Go(func() error { + for { + select { + case b, ok := <-bulk: + if !ok { + return nil + } + + if err := sem.Acquire(ctx, 1); err != nil { + return errors.Wrap(err, "can't acquire semaphore") + } + + g.Go(func(b []T) func() error { + return func() error { + defer sem.Release(1) + + return retry.WithBackoff( + ctx, + func(ctx context.Context) error { + _, err := db.NamedExecContext(ctx, query, b) + if err != nil { + return CantPerformQuery(err, query) + } + + counter.Add(uint64(len(b))) + + for _, onSuccess := range onSuccess { + // TODO (jr): remove -> workaround vvvv + var items []any + for _, item := range b { + items = append(items, any(item)) + } + // TODO ---- workaround end ---- ^^^^ + + if err := onSuccess(ctx, items); err != nil { + return err + } + } + + return nil + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + db.GetDefaultRetrySettings(), + ) + } + }(b)) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() +} + +func splitOnDupId[T any]() com.BulkChunkSplitPolicy[T] { + seenIds := map[string]struct{}{} + + return func(ider T) bool { + entity, ok := any(ider).(IDer) + if !ok { + panic("Type T does not implement IDer") + } + + id := entity.ID().String() + + _, ok = seenIds[id] + if ok { + seenIds = map[string]struct{}{id: {}} + } else { + seenIds[id] = struct{}{} + } + + return ok + } }