Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metamorph records clearing #217

Merged
merged 17 commits into from
Jan 5, 2024
Merged
3 changes: 2 additions & 1 deletion background_worker/jobs/clear_block_transactions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
_ "github.com/lib/pq"
)

func (c ClearJob) ClearBlockTransactionsMap(params ClearRecrodsParams) error {
func (c ClearJob) ClearBlockTransactionsMap(params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")
conn, err := sqlx.Open(params.Scheme(), params.String())

if err != nil {
Log(ERROR, fmt.Sprintf("unable to create connection %s", err))
return err
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_block_transactions_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type ClearBlockTransactionsMapSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *ClearBlockTransactionsMapSuite) Test() {
params := ClearRecrodsParams{
params := ClearRecordsParams{
DBConnectionParams: DefaultParams,
RecordRetentionDays: 10,
}
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
numericalDateHourLayout = "2006010215"
)

type ClearRecrodsParams struct {
type ClearRecordsParams struct {
dbconn.DBConnectionParams
RecordRetentionDays int
}
Expand All @@ -41,7 +41,7 @@ func NewClearJob(opts ...func(job *ClearJob)) *ClearJob {
return c
}

func (c ClearJob) ClearBlocks(params ClearRecrodsParams) error {
func (c ClearJob) ClearBlocks(params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")

conn, err := sqlx.Open(params.Scheme(), params.String())
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type ClearJobSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *ClearJobSuite) Test() {
params := ClearRecrodsParams{
params := ClearRecordsParams{
DBConnectionParams: DefaultParams,
RecordRetentionDays: 10,
}
Expand Down
43 changes: 43 additions & 0 deletions background_worker/jobs/clear_metamorph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package jobs

import (
"fmt"

"github.com/jmoiron/sqlx"
)

func runDelete(table string, params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")
conn, err := sqlx.Open(params.Scheme(), params.String())
if err != nil {
Log(ERROR, fmt.Sprintf("unable to create connection %s", err))
return err
}
interval := fmt.Sprintf("%d days", params.RecordRetentionDays)

stmt, err := conn.Preparex(fmt.Sprintf("DELETE FROM %s WHERE inserted_at <= (CURRENT_DATE - $1::interval)", table))
if err != nil {
Log(ERROR, fmt.Sprintf("unable to prepare statement %s", err))
return err
}

res, err := stmt.Exec(interval)
if err != nil {
Log(ERROR, "unable to delete block rows")
return err
}
rows, _ := res.RowsAffected()
Log(INFO, fmt.Sprintf("Successfully deleted %d rows", rows))
return nil
}

func ClearMetamorph(params ClearRecordsParams) error {
if err := runDelete("metamorph.blocks", params); err != nil {
return err
}
if err := runDelete("metamorph.transactions", params); err != nil {
return err
}

return nil
}
71 changes: 71 additions & 0 deletions background_worker/jobs/clear_metamorph_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package jobs

import (
"testing"
"time"

. "github.com/bitcoin-sv/arc/database_testing"
"github.com/bitcoin-sv/arc/metamorph/store"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type ClearMetamorphSuite struct {
MetamorphDBTestSuite
}

func (s *ClearMetamorphSuite) Test() {

for i := 0; i < 5; i++ {
blk := GetTestMMBlock()
blk.InsertedAt = time.Now().Add(-20 * 24 * time.Hour)
s.InsertBlock(blk)
}

for i := 0; i < 5; i++ {
blk := GetTestMMBlock()
blk.InsertedAt = time.Now().Add(-1 * 24 * time.Hour)
s.InsertBlock(blk)
}

for i := 0; i < 5; i++ {
tx := GetTestMMTransaction()
tx.InsertedAt = time.Now().Add(-20 * 24 * time.Hour)

s.InsertTransaction(tx)
}

for i := 0; i < 5; i++ {
tx := GetTestMMTransaction()
tx.InsertedAt = time.Now().Add(-1 * 24 * time.Hour)

s.InsertTransaction(tx)
}

err := ClearMetamorph(ClearRecordsParams{
DBConnectionParams: DefaultMMParams,
RecordRetentionDays: 14,
})

require.NoError(s.T(), err)

db, err := sqlx.Open("postgres", DefaultMMParams.String())
require.NoError(s.T(), err)

var blks []store.Block
require.NoError(s.T(), db.Select(&blks, "SELECT * from metamorph.blocks"))

assert.Len(s.T(), blks, 5)

var stx []store.Transaction
require.NoError(s.T(), db.Select(&stx, "SELECT * from metamorph.transactions"))
assert.Len(s.T(), stx, 5)

}

func TestRunClearMM(t *testing.T) {
s := new(ClearMetamorphSuite)
suite.Run(t, s)
}
2 changes: 1 addition & 1 deletion background_worker/jobs/clear_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
_ "github.com/lib/pq"
)

func (c ClearJob) ClearTransactions(params ClearRecrodsParams) error {
func (c ClearJob) ClearTransactions(params ClearRecordsParams) error {
Log(INFO, "Connecting to database ...")

conn, err := sqlx.Open(params.Scheme(), params.String())
Expand Down
4 changes: 2 additions & 2 deletions background_worker/jobs/clear_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type ClearTransactionsSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *ClearTransactionsSuite) Test() {
params := ClearRecrodsParams{
params := ClearRecordsParams{
DBConnectionParams: DefaultParams,
RecordRetentionDays: 10,
}
Expand Down
4 changes: 2 additions & 2 deletions background_worker/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
type ARCScheduler struct {
Scheduler *gocron.Scheduler
IntervalInHours int
Params jobs.ClearRecrodsParams
Params jobs.ClearRecordsParams
}

func (sched *ARCScheduler) RunJob(table string, job func(params jobs.ClearRecrodsParams) error) {
func (sched *ARCScheduler) RunJob(table string, job func(params jobs.ClearRecordsParams) error) {
_, err := sched.Scheduler.Every(sched.IntervalInHours).Hours().Do(func() {
jobs.Log(jobs.INFO, fmt.Sprintf("Clearing expired %s...", table))
err := job(sched.Params)
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_for_height_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type GetBlockByHeightTestSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockByHeightTestSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_gaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type GetBlockGapTestSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockGapTestSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type GetBlockTestSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockTestSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_block_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type GetBlockTransactionsSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetBlockTransactionsSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_last_processed_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type GetLastProcessedBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetLastProcessedBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_mined_transaction_for_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type MinedTransactionForBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *MinedTransactionForBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_transaction_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type GetTransactionBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetTransactionBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_transaction_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type GetTransactionBlocksSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetTransactionBlocksSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/get_transaction_merkle_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type GetTransactionMerklePathSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *GetTransactionMerklePathSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/insert_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type InsertBlockSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *InsertBlockSuite) Test() {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/insert_block_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type InsertBlockTransactionsSuite struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

type Tx struct {
Expand Down
2 changes: 1 addition & 1 deletion blocktx/store/sql/orphan_height_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type OrphanHeightSutie struct {
DatabaseTestSuite
BlockTXDBTestSuite
}

func (s *OrphanHeightSutie) Test() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/background_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func StartBackGroundWorker(logger *slog.Logger) (func(), error) {
return nil, err
}

params := jobs.ClearRecrodsParams{
params := jobs.ClearRecordsParams{
DBConnectionParams: dbconn.New(
dbHost,
dbPort,
Expand Down
2 changes: 1 addition & 1 deletion cmd/background_worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
}
}

params := jobs.ClearRecrodsParams{
params := jobs.ClearRecordsParams{
DBConnectionParams: dbconn.New(
viper.GetString("cleanBlocks.host"),
viper.GetInt("cleanBlocks.port"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE metamorph.transactions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE SCHEMA metamorph;
CREATE TABLE metamorph.transactions (
hash BYTEA PRIMARY KEY,
stored_at TIMESTAMPTZ,
announced_at TIMESTAMPTZ,
mined_at TIMESTAMPTZ,
status INTEGER,
block_height BIGINT,
block_hash BYTEA,
callback_url TEXT,
callback_token TEXT,
merkle_proof TEXT,
reject_reason TEXT,
raw_tx BYTEA,
locked_by TEXT,
inserted_at_num INTEGER DEFAULT TO_NUMBER(TO_CHAR((NOW()) AT TIME ZONE 'UTC', 'yyyymmddhh24'), '9999999999') NOT NULL
);

CREATE INDEX ix_metamorph_transactions_locked_by ON metamorph.transactions (locked_by);
CREATE INDEX ix_metamorph_transactions_inserted_at_num ON metamorph.transactions (inserted_at_num);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE metamorph.blocks;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE metamorph.blocks (
hash BYTEA PRIMARY KEY,
processed_at TIMESTAMPTZ,
inserted_at_num INTEGER DEFAULT TO_NUMBER(TO_CHAR((NOW()) AT TIME ZONE 'UTC', 'yyyymmddhh24'), '9999999999') NOT NULL
);

CREATE INDEX ix_metamorph_blocks_inserted_at_num ON metamorph.blocks (inserted_at_num);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP FUNCTION reverse_bytes_iter(bytes bytea, length int, midpoint int, index int);
DROP FUNCTION reverse_bytes(bytes bytea);
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE OR REPLACE FUNCTION reverse_bytes_iter(bytes bytea, length int, midpoint int, index int)
RETURNS bytea AS
$$
SELECT CASE WHEN index >= midpoint THEN bytes ELSE
reverse_bytes_iter(
set_byte(
set_byte(bytes, index, get_byte(bytes, length-index)),
length-index, get_byte(bytes, index)
),
length, midpoint, index + 1
)
END;
$$ LANGUAGE SQL IMMUTABLE;

CREATE
OR REPLACE FUNCTION reverse_bytes(bytes bytea) RETURNS bytea AS 'SELECT reverse_bytes_iter(bytes, octet_length(bytes)-1, octet_length(bytes)/2, 0)' LANGUAGE SQL IMMUTABLE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE metamorph.blocks DROP COLUMN inserted_at;
ALTER TABLE metamorph.transactions DROP COLUMN inserted_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE metamorph.transactions ADD COLUMN inserted_at TIMESTAMPTZ;
ALTER TABLE metamorph.blocks ADD COLUMN inserted_at TIMESTAMPTZ;
Loading