Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metamorph records clearing #217

Merged
merged 17 commits into from
Jan 5, 2024
Merged
3 changes: 2 additions & 1 deletion background_worker/jobs/clear_block_transactions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
_ "github.com/lib/pq"
)

func (c ClearJob) ClearBlockTransactionsMap(params ClearRecrodsParams) error {
func (c ClearJob) ClearBlockTransactionsMap(params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")
conn, err := sqlx.Open(params.Scheme(), params.String())

if err != nil {
Log(ERROR, fmt.Sprintf("unable to create connection %s", err))
return err
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_block_transactions_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type ClearBlockTransactionsMapSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *ClearBlockTransactionsMapSuite) Test() {
params := ClearRecrodsParams{
params := ClearRecordsParams{
DBConnectionParams: DefaultParams,
RecordRetentionDays: 10,
}
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
numericalDateHourLayout = "2006010215"
)

type ClearRecrodsParams struct {
type ClearRecordsParams struct {
dbconn.DBConnectionParams
RecordRetentionDays int
}
Expand All @@ -41,7 +41,7 @@ func NewClearJob(opts ...func(job *ClearJob)) *ClearJob {
return c
}

func (c ClearJob) ClearBlocks(params ClearRecrodsParams) error {
func (c ClearJob) ClearBlocks(params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")

conn, err := sqlx.Open(params.Scheme(), params.String())
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type ClearJobSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *ClearJobSuite) Test() {
params := ClearRecrodsParams{
params := ClearRecordsParams{
DBConnectionParams: DefaultParams,
RecordRetentionDays: 10,
}
Expand Down
42 changes: 42 additions & 0 deletions background_worker/jobs/clear_metamorph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package jobs

import (
"fmt"
"github.com/jmoiron/sqlx"
)

func runDelete(table string, params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")
conn, err := sqlx.Open(params.Scheme(), params.String())
if err != nil {
Log(ERROR, fmt.Sprintf("unable to create connection %s", err))
return err
}
interval := fmt.Sprintf("%d days", params.RecordRetentionDays)

stmt, err := conn.Preparex(fmt.Sprintf("DELETE FROM %s WHERE inserted_at <= (CURRENT_DATE - $1::interval)", table))
if err != nil {
Log(ERROR, fmt.Sprintf("unable to prepare statement %s", err))
return err
}

res, err := stmt.Exec(interval)
if err != nil {
Log(ERROR, "unable to delete block rows")
return err
}
rows, _ := res.RowsAffected()
Log(INFO, fmt.Sprintf("Successfully deleted %d rows", rows))
return nil
}

func ClearMetamorph(params ClearRecordsParams) error {
if err := runDelete("blocks", params); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After adding the schema the from clause would be metamorph.blocks like here

q := `INSERT INTO metamorph.blocks (

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's a good idea to hardcode database name inside the query. Because if the name inside db connection can be different from the one inside the query.

return err
}
if err := runDelete("transactions", params); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here

q := `UPDATE metamorph.transactions SET locked_by = 'NONE' WHERE hash = ANY($1);`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, why do we need to hardcode the database name?

return err
}

return nil
}
70 changes: 70 additions & 0 deletions background_worker/jobs/clear_metamorph_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package jobs

import (
. "github.com/bitcoin-sv/arc/database_testing"
"github.com/bitcoin-sv/arc/metamorph/store"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

type ClearMetamorphSuite struct {
MetamorphDBTestSuite
}

func (s *ClearMetamorphSuite) Test() {

for i := 0; i < 5; i++ {
blk := GetTestMMBlock()
blk.InsertedAt = time.Now().Add(-20 * 24 * time.Hour)
s.InsertBlock(blk)
}

for i := 0; i < 5; i++ {
blk := GetTestMMBlock()
blk.InsertedAt = time.Now().Add(-1 * 24 * time.Hour)
s.InsertBlock(blk)
}

for i := 0; i < 5; i++ {
tx := GetTestMMTransaction()
tx.InsertedAt = time.Now().Add(-20 * 24 * time.Hour)

s.InsertTransaction(tx)
}

for i := 0; i < 5; i++ {
tx := GetTestMMTransaction()
tx.InsertedAt = time.Now().Add(-1 * 24 * time.Hour)

s.InsertTransaction(tx)
}

err := ClearMetamorph(ClearRecordsParams{
DBConnectionParams: DefaultMMParams,
RecordRetentionDays: 14,
})

require.NoError(s.T(), err)

db, err := sqlx.Open("postgres", DefaultMMParams.String())
require.NoError(s.T(), err)

var blks []store.Block
require.NoError(s.T(), db.Select(&blks, "SELECT * from blocks"))

assert.Len(s.T(), blks, 5)

var stx []store.Transaction
require.NoError(s.T(), db.Select(&stx, "SELECT * from transactions"))
assert.Len(s.T(), stx, 5)

}

func TestRunClearMM(t *testing.T) {
s := new(ClearMetamorphSuite)
suite.Run(t, s)
}
2 changes: 1 addition & 1 deletion background_worker/jobs/clear_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
_ "github.com/lib/pq"
)

func (c ClearJob) ClearTransactions(params ClearRecrodsParams) error {
func (c ClearJob) ClearTransactions(params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")

conn, err := sqlx.Open(params.Scheme(), params.String())
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type ClearTransactionsSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *ClearTransactionsSuite) Test() {
params := ClearRecrodsParams{
params := ClearRecordsParams{
DBConnectionParams: DefaultParams,
RecordRetentionDays: 10,
}
Expand Down
4 changes: 2 additions & 2 deletions background_worker/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
type ARCScheduler struct {
Scheduler *gocron.Scheduler
IntervalInHours int
Params jobs.ClearRecrodsParams
Params jobs.ClearRecordsParams
}

func (sched *ARCScheduler) RunJob(table string, job func(params jobs.ClearRecrodsParams) error) {
func (sched *ARCScheduler) RunJob(table string, job func(params jobs.ClearRecordsParams) error) {
_, err := sched.Scheduler.Every(sched.IntervalInHours).Hours().Do(func() {
jobs.Log(jobs.INFO, fmt.Sprintf("Clearing expired %s...", table))
err := job(sched.Params)
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_for_height_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type GetBlockByHeightTestSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockByHeightTestSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_gaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type GetBlockGapTestSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockGapTestSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type GetBlockTestSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockTestSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type GetBlockTransactionsSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockTransactionsSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_last_processed_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type GetLastProcessedBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetLastProcessedBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_mined_transaction_for_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type MinedTransactionForBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *MinedTransactionForBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_transaction_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type GetTransactionBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetTransactionBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_transaction_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type GetTransactionBlocksSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetTransactionBlocksSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_transaction_merkle_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type GetTransactionMerklePathSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetTransactionMerklePathSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_transaction_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type GetTransactionSourceSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetTransactionSourceSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/insert_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type InsertBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *InsertBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/insert_block_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type InsertBlockTransactionsSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

type Tx struct {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/orphan_height_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type OrphanHeightSutie struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *OrphanHeightSutie) Test() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/background_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func StartBackGroundWorker(logger *slog.Logger) (func(), error) {
return nil, err
}

params := jobs.ClearRecrodsParams{
params := jobs.ClearRecordsParams{
DBConnectionParams: dbconn.New(
dbHost,
dbPort,
Expand Down
2 changes: 1 addition & 1 deletion cmd/background_worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
}
}

params := jobs.ClearRecrodsParams{
params := jobs.ClearRecordsParams{
DBConnectionParams: dbconn.New(
viper.GetString("cleanBlocks.host"),
viper.GetInt("cleanBlocks.port"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE transactions;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you copy these files from this path: metamorph/store/postgresql/migrations
then they should be exactly the same as they are the migration files which are actually used in the deployment. Right now they're not the same

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migrations are still not exaclty the same. This schema is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it have to have custom schema?

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE TABLE transactions
(
hash BYTEA PRIMARY KEY,
stored_at TIMESTAMPTZ,
announced_at TIMESTAMPTZ,
mined_at TIMESTAMPTZ,
status INTEGER,
block_height BIGINT,
block_hash BYTEA,
callback_url TEXT,
callback_token TEXT,
merkle_proof TEXT,
reject_reason TEXT,
raw_tx BYTEA,
locked_by TEXT,
inserted_at TIMESTAMPTZ
);

CREATE INDEX ix_metamorph_transactions_locked_by ON transactions (locked_by);
CREATE INDEX ix_metamorph_transactions_inserted_at_num ON transactions (inserted_at);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE blocks;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE blocks
(
hash BYTEA PRIMARY KEY,
processed_at TIMESTAMPTZ,
inserted_at TIMESTAMPTZ
);

CREATE INDEX ix_metamorph_blocks_inserted_at ON blocks (inserted_at);
Loading