From 5cef8f032fb0d7196d861dcac8973f7c895f6194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Willy=20Rom=C3=A3o?= Date: Sat, 14 Nov 2020 14:07:57 +0000 Subject: [PATCH] add transaction to sync mode --- repository/repository_pg.go | 4 +++- syncer/repository_pg.go | 4 ++-- syncer/service.go | 16 ++++++++++++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/repository/repository_pg.go b/repository/repository_pg.go index 478651d..f03a5cb 100644 --- a/repository/repository_pg.go +++ b/repository/repository_pg.go @@ -4,6 +4,7 @@ import ( "context" // postgres driver + "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" log "github.com/sirupsen/logrus" ) @@ -13,9 +14,10 @@ type Postgres struct { pool *pgxpool.Pool } -// PostgresConn postgres conn repo +// PostgresConn postgres conn repo and a tx transaction type PostgresConn struct { Conn *pgxpool.Conn + Tx pgx.Tx } // NewPostgres return a new postgres repository diff --git a/syncer/repository_pg.go b/syncer/repository_pg.go index 3a254da..d89c6a0 100644 --- a/syncer/repository_pg.go +++ b/syncer/repository_pg.go @@ -50,7 +50,7 @@ func (s *Service) getTableColumns(ctx context.Context, sourceConn *repository.Po // truncateTable func (s *Service) truncateTable(ctx context.Context, conn *repository.PostgresConn, schema, table string) (err error) { query := fmt.Sprintf("truncate table %s.%s", schema, table) - _, err = conn.Conn.Exec(ctx, query) + _, err = conn.Tx.Exec(ctx, query) return } @@ -92,7 +92,7 @@ func (s *Service) copyFromSelect(ctx context.Context, sourceConn *repository.Pos destinationIdentifier := pgx.Identifier{s.Access.DestinationSchema, s.Access.DestinationTable} - copyCount, err := destinationConn.Conn.CopyFrom(ctx, destinationIdentifier, sourceColumns, rows) + copyCount, err := destinationConn.Tx.CopyFrom(ctx, destinationIdentifier, sourceColumns, rows) if err != nil { log.Errorf("service.copyFromSelect(): sourceConn.Conn.CopyFrom() error=%w", err) return diff --git a/syncer/service.go b/syncer/service.go index be05f46..75f7c43 100644 --- a/syncer/service.go +++ b/syncer/service.go @@ -46,6 +46,15 @@ func (s *Service) Run(ctx context.Context) error { switch s.Access.SyncMode { case FullSync: log.Debugf("sync_mode selected: %s", FullSync) + + // open the transaction to this mode + destinationConn.Tx, err = destinationConn.Conn.Conn().Begin(ctx) + if err != nil { + log.Errorf("service.Run(): destinationConn.Conn.Conn().Begin() error=%w", err) + return err + } + defer destinationConn.Tx.Rollback(ctx) + err = s.truncateTable(ctx, destinationConn, s.Access.DestinationSchema, s.Access.DestinationTable) if err != nil { log.Errorf("service.Run(): s.truncateTable() error=%w", err) @@ -57,6 +66,13 @@ func (s *Service) Run(ctx context.Context) error { log.Errorf("service.Run(): s.copyFromSelect() error=%w", err) return err } + + err = destinationConn.Tx.Commit(ctx) + if err != nil { + log.Errorf("service.Run(): destinationConn.Tx.Commit() error=%w", err) + return err + } + log.Debugf("finish the job, sync_mode selected: %s", FullSync) default: