Skip to content

Commit

Permalink
storage: split consensus event related accounts into separate table
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Jan 31, 2025
1 parent eadec04 commit a4b2c12
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 78 deletions.
1 change: 1 addition & 0 deletions .changelog/809.feature.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
storage: split consensus event related accounts into a separate table
71 changes: 59 additions & 12 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
type EventType = apiTypes.ConsensusEventType // alias for brevity

type parsedEvent struct {
eventIdx int
ty EventType
rawBody json.RawMessage
roothashRuntimeID *coreCommon.Namespace
Expand Down Expand Up @@ -253,12 +254,14 @@ func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data allData) erro
m.queueBlockInserts,
m.queueEpochInserts,
m.queueTransactionInserts,
m.queueTxEventInserts,
} {
if err := f(batch, data.BlockData); err != nil {
return err
}
}
if err := m.queueTxEventInserts(batch, &data); err != nil {
return err
}

for _, f := range []func(*storage.QueryBatch, *registryData) error{
m.queueEntityEvents,
Expand Down Expand Up @@ -586,16 +589,45 @@ func (m *processor) queueTransactionInserts(batch *storage.QueryBatch, data *con
}

// Enqueue DB statements to store events that were generated as the result of a TX execution.
func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *consensusBlockData) error {
for i, txr := range data.TransactionsWithResults {
func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *allData) error {
for i, txr := range data.BlockData.TransactionsWithResults {
txAccounts := []staking.Address{
// Always insert sender as a related address, some transactions (e.g. failed ones) might not have
// any events associated.
// TODO: this could also track the receiver (when applicable), but currently we don't do
// much transaction parsing, where we could extract it for each transaction type.
staking.NewAddress(txr.Transaction.Signature.PublicKey),
}
for _, event := range txr.Result.Events {

// Find all events associated with transaction.
// We don't use txr.Result.Events, because those do not have the event index unique within the block.
txEvents := make([]nodeapi.Event, 0, len(txr.Result.Events))
for _, event := range data.GovernanceData.Events {
if event.TxHash == txr.Transaction.Hash() {
txEvents = append(txEvents, event)
}
}
for _, event := range data.RegistryData.Events {
if event.TxHash == txr.Transaction.Hash() {
txEvents = append(txEvents, event)
}
}
for _, event := range data.RootHashData.Events {
if event.TxHash == txr.Transaction.Hash() {
txEvents = append(txEvents, event)
}
}
for _, event := range data.StakingData.Events {
if event.TxHash == txr.Transaction.Hash() {
txEvents = append(txEvents, event)
}
}
// Sanity check that the number of event matches.
if len(txEvents) != len(txr.Result.Events) {
return fmt.Errorf("transaction %s has %d events, but only %d were found", txr.Transaction.Hash().Hex(), len(txr.Result.Events), len(txEvents))
}

for _, event := range txEvents {
eventData := m.extractEventData(event)
txAccounts = append(txAccounts, eventData.relatedAddresses...)
accounts := extractUniqueAddresses(eventData.relatedAddresses)
Expand All @@ -605,22 +637,29 @@ func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *consens
}

batch.Queue(queries.ConsensusEventInsert,
data.BlockData.Height,
string(eventData.ty),
eventData.eventIdx,
string(body),
data.Height,
txr.Transaction.Hash().Hex(),
i,
accounts,
common.StringOrNil(eventData.roothashRuntimeID),
eventData.roothashRuntime,
eventData.roothashRuntimeRound,
)
batch.Queue(queries.ConsensusEventRelatedAccountsInsert,
data.BlockData.Height,
string(eventData.ty),
eventData.eventIdx,
i,
accounts,
)
}
uniqueTxAccounts := extractUniqueAddresses(txAccounts)
for _, addr := range uniqueTxAccounts {
batch.Queue(queries.ConsensusAccountRelatedTransactionInsert,
addr,
data.Height,
data.BlockData.Height,
i,
)

Expand All @@ -630,7 +669,7 @@ func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *consens
batch.Queue(
queries.ConsensusAccountFirstActivityUpsert,
addr,
data.BlockHeader.Time.UTC(),
data.BlockData.BlockHeader.Time.UTC(),
)
}
}
Expand Down Expand Up @@ -1376,16 +1415,23 @@ func (m *processor) queueSingleEventInserts(batch *storage.QueryBatch, eventData
}

batch.Queue(queries.ConsensusEventInsert,
height,
string(eventData.ty),
eventData.eventIdx,
string(body),
height,
nil,
nil,
accounts,
common.StringOrNil(eventData.roothashRuntimeID),
eventData.roothashRuntime,
eventData.roothashRuntimeRound,
)
batch.Queue(queries.ConsensusEventRelatedAccountsInsert,
height,
string(eventData.ty),
eventData.eventIdx,
nil,
accounts,
)

return nil
}
Expand All @@ -1408,8 +1454,9 @@ func extractUniqueAddresses(accounts []staking.Address) []string {
// extractEventData extracts the type, the body (JSON-serialized), and the related accounts of an event.
func (m *processor) extractEventData(event nodeapi.Event) parsedEvent {
eventData := parsedEvent{
ty: event.Type,
rawBody: event.RawBody,
eventIdx: event.EventIdx,
ty: event.Type,
rawBody: event.RawBody,
}

// Fill in related accounts.
Expand Down
16 changes: 16 additions & 0 deletions analyzer/consensus/data_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func fetchRegistryData(ctx context.Context, cc nodeapi.ConsensusApiLite, height
if err != nil {
return nil, err
}
// XXX: We do this here, so that we don't need to invalidate the caches.
for i := range events {
events[i].EventIdx = i
}

var runtimeStartedEvents []nodeapi.RuntimeStartedEvent
var runtimeSuspendedEvents []nodeapi.RuntimeSuspendedEvent
Expand Down Expand Up @@ -199,6 +203,10 @@ func fetchStakingData(ctx context.Context, cc nodeapi.ConsensusApiLite, height i
if err != nil {
return nil, err
}
// XXX: We do this here, so that we don't need to invalidate the caches.
for i := range events {
events[i].EventIdx = i
}

epoch, err := cc.GetEpoch(ctx, height)
if err != nil {
Expand Down Expand Up @@ -281,6 +289,10 @@ func fetchGovernanceData(ctx context.Context, cc nodeapi.ConsensusApiLite, heigh
if err != nil {
return nil, err
}
// XXX: We do this here, so that we don't need to invalidate the caches.
for i := range events {
events[i].EventIdx = i
}

var submissions []nodeapi.Proposal
var executions []nodeapi.ProposalExecutedEvent
Expand Down Expand Up @@ -324,6 +336,10 @@ func fetchRootHashData(ctx context.Context, cc nodeapi.ConsensusApiLite, network
if err != nil {
return nil, err
}
// XXX: We do this here, so that we don't need to invalidate the caches.
for i := range events {
events[i].EventIdx = i
}

lastRoundResults := make(map[coreCommon.Namespace]*roothash.RoundResults, len(network.ParaTimes.All))

Expand Down
6 changes: 5 additions & 1 deletion analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,13 @@ var (
schedule = excluded.schedule`

ConsensusEventInsert = `
INSERT INTO chain.events (type, body, tx_block, tx_hash, tx_index, related_accounts, roothash_runtime_id, roothash_runtime, roothash_runtime_round)
INSERT INTO chain.events (tx_block, type, event_index, body, tx_hash, tx_index, roothash_runtime_id, roothash_runtime, roothash_runtime_round)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`

ConsensusEventRelatedAccountsInsert = `
INSERT INTO chain.events_related_accounts (tx_block, type, event_index, tx_index, account_address)
SELECT $1, $2, $3, $4, unnest($5::text[])`

ConsensusEscrowEventInsert = `
INSERT INTO history.escrow_events (tx_block, epoch, type, delegatee, delegator, shares, amount, debonding_amount)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`
Expand Down
28 changes: 21 additions & 7 deletions storage/client/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,29 @@ const (
OFFSET $9::bigint`

Events = `
SELECT tx_block, tx_index, tx_hash, roothash_runtime_id, roothash_runtime, roothash_runtime_round, type, body, b.time
SELECT
chain.events.tx_block,
chain.events.tx_index,
chain.events.tx_hash,
chain.events.roothash_runtime_id,
chain.events.roothash_runtime,
chain.events.roothash_runtime_round,
chain.events.type,
chain.events.body,
b.time
FROM chain.events
LEFT JOIN chain.blocks b ON tx_block = b.height
WHERE ($1::bigint IS NULL OR tx_block = $1::bigint) AND
($2::integer IS NULL OR tx_index = $2::integer) AND
($3::text IS NULL OR tx_hash = $3::text) AND
($4::text IS NULL OR type = $4::text) AND
($5::text IS NULL OR ARRAY[$5::text] <@ related_accounts)
ORDER BY tx_block DESC, tx_index
LEFT JOIN chain.events_related_accounts rel ON
chain.events.tx_block = rel.tx_block AND
chain.events.event_index = rel.event_index AND
-- When related_address ($5) is NULL and hence we do no filtering on it, avoid the join altogether.
($5::text IS NOT NULL)
WHERE ($1::bigint IS NULL OR chain.events.tx_block = $1::bigint) AND
($2::integer IS NULL OR chain.events.tx_index = $2::integer) AND
($3::text IS NULL OR chain.events.tx_hash = $3::text) AND
($4::text IS NULL OR chain.events.type = $4::text) AND
($5::text IS NULL OR rel.account_address = $5::text)
ORDER BY chain.events.tx_block DESC, chain.events.tx_index
LIMIT $6::bigint
OFFSET $7::bigint`

Expand Down
5 changes: 3 additions & 2 deletions storage/migrations/00_consensus.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ CREATE INDEX ix_transactions_method_block_tx_index ON chain.transactions (method
CREATE TABLE chain.events
(
tx_block UINT63 NOT NULL,
-- event_index UINT31 NOT NULL, -- Added in 14_events_related_accounts.up.sql
tx_index UINT31,

type TEXT NOT NULL, -- Enum with many values, see ConsensusEventType in api/spec/v1.yaml.
body JSONB,
tx_hash HEX64, -- could be fetched from `transactions` table; denormalized for efficiency
related_accounts TEXT[],
related_accounts TEXT[], -- Removed in 14_events_related_accounts.up.sql
-- There's some mismatch between oasis-core's style in Go and nexus's
-- style in SQL and JSON. oasis-core likes structures filled with nilable
-- pointers, where one pointer is non-nil. nexus likes a type string plus
Expand All @@ -112,7 +113,7 @@ CREATE TABLE chain.events

FOREIGN KEY (tx_block, tx_index) REFERENCES chain.transactions(block, tx_index) DEFERRABLE INITIALLY DEFERRED
);
CREATE INDEX ix_events_related_accounts ON chain.events USING gin(related_accounts);
CREATE INDEX ix_events_related_accounts ON chain.events USING gin(related_accounts); -- Removed in 14_events_related_accounts.up.sql
CREATE INDEX ix_events_tx_block ON chain.events (tx_block); -- for fetching events without filters
CREATE INDEX ix_events_tx_hash ON chain.events (tx_hash);
CREATE INDEX ix_events_type ON chain.events (type, tx_block); -- tx_block is for sorting the events of a given type by recency
Expand Down
12 changes: 6 additions & 6 deletions storage/migrations/01_runtimes.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ CREATE TABLE chain.runtime_events
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,


-- The raw event, as returned by the oasis-sdk runtime client.
type TEXT NOT NULL,
-- The raw event, as returned by the oasis-sdk runtime client.
body JSONB NOT NULL,
related_accounts TEXT[], -- Removed in 13_runtime_events_related_accounts.up.sql.

Expand Down Expand Up @@ -196,14 +196,14 @@ CREATE INDEX ix_runtime_events_nft_transfers ON chain.runtime_events (runtime, (
-- (
-- runtime runtime NOT NULL,
-- round UINT63 NOT NULL,
-- tx_index UINT31 NOT NULL,
-- type TEXT NOT NULL,
-- type_index UINT31 NOT NULL,
-- event_index UINT31 NOT NULL,

-- tx_index UINT31,
-- type TEXT NOT NULL,
-- account_address oasis_addr NOT NULL,
-- FOREIGN KEY (runtime, round, tx_index, type, type_index) REFERENCES chain.runtime_events(runtime, round, tx_index, type, type_index) DEFERRABLE INITIALLY DEFERRED
-- FOREIGN KEY (runtime, round, event_index) REFERENCES chain.runtime_events(runtime, round, event_index) DEFERRABLE INITIALLY DEFERRED
-- );
-- CREATE INDEX ix_runtime_events_related_accounts_related_account_round ON chain.runtime_events_related_accounts(runtime, account_address, round);
-- CREATE INDEX ix_runtime_events_related_accounts_account_address_round ON chain.runtime_events_related_accounts(runtime, account_address, round, tx_index);

-- Roothash messages are small structures that a runtime can send to
-- communicate with the consensus layer. They are agreed upon for each runtime
Expand Down
75 changes: 75 additions & 0 deletions storage/migrations/14_events_related_accounts.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
BEGIN;

ALTER TABLE chain.events ADD COLUMN event_index UINT31;

-- Populate the event_index column with sequential values for each tx_block to ensure uniqueness.
DO $$
DECLARE
cur_block UINT63;
cur_type TEXT;
BEGIN
-- Iterate over each unique (tx_block, type) pair.
FOR cur_block, cur_type IN
SELECT tx_block, type
FROM chain.events
GROUP BY tx_block, type
LOOP
-- Assign event_index within the group.
WITH ranked_events AS (
SELECT ctid,
row_number() OVER (ORDER BY timestamp ASC) - 1 AS event_idx
FROM chain.events
WHERE tx_block = cur_block AND type = cur_type
)
UPDATE chain.events e
SET event_index = ranked_events.event_idx
FROM ranked_events
WHERE e.ctid = ranked_events.ctid;
END LOOP;
END $$;

ALTER TABLE chain.events
ALTER COLUMN event_index SET NOT NULL;

-- Primary key for runtime events.
ALTER TABLE chain.events
ADD CONSTRAINT pk_events PRIMARY KEY (tx_block, type, event_index);


-- Create events related accounts table.
CREATE TABLE chain.events_related_accounts
(
tx_block UINT63 NOT NULL,
type TEXT NOT NULL,
event_index UINT31 NOT NULL,

tx_index UINT31,
account_address oasis_addr NOT NULL,

FOREIGN KEY (tx_block, type, event_index) REFERENCES chain.events(tx_block, type, event_index) DEFERRABLE INITIALLY DEFERRED
);

-- Populate the events_related_accounts table.
INSERT INTO
chain.events_related_accounts (tx_block, type, event_index, tx_index, account_address)
SELECT
tx_block, type, event_index, tx_index, unnest(related_accounts) AS account_address
FROM
chain.events
WHERE
related_accounts IS NOT NULL
AND array_length(related_accounts, 1) > 0;


-- Used for fetching all events related to an account (sorted by round).
CREATE INDEX ix_events_related_accounts_account_address_block ON chain.events_related_accounts(account_address, tx_block DESC, tx_index); -- TODO: maybe also event index?

DROP INDEX chain.ix_events_related_accounts;
ALTER TABLE chain.events DROP COLUMN related_accounts;

-- Grant others read-only use.
-- (We granted already in 00_consensus.up.sql, but the grant does not apply to new tables.)
GRANT SELECT ON ALL TABLES IN SCHEMA chain TO PUBLIC;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA chain TO PUBLIC;

COMMIT;
Loading

0 comments on commit a4b2c12

Please sign in to comment.