Skip to content

Commit

Permalink
logs are forwarded to a processor in slot and trx order
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed Dec 2, 2024
1 parent 88cca37 commit 72089b2
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 16 deletions.
19 changes: 12 additions & 7 deletions pkg/solana/logpoller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (j retryableJob) Run(ctx context.Context) error {
}

type eventDetail struct {
blockNumber uint64
slotNumber uint64
blockHeight uint64
blockHash solana.Hash
trxIdx int
trxSig solana.Signature
Expand All @@ -59,7 +60,7 @@ func (j *processEventJob) Run(_ context.Context) error {
type getTransactionsFromBlockJob struct {
slotNumber uint64
client RPCClient
parser ProgramEventProcessor
parser *orderedParser
chJobs chan Job
}

Expand Down Expand Up @@ -103,17 +104,20 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error {
}

detail := eventDetail{
blockHash: block.Blockhash,
slotNumber: j.slotNumber,
blockHash: block.Blockhash,
}

if block.BlockHeight != nil {
detail.blockNumber = *block.BlockHeight
detail.blockHeight = *block.BlockHeight
}

if len(block.Transactions) != len(blockSigsOnly.Signatures) {
return fmt.Errorf("block %d has %d transactions but %d signatures", j.slotNumber, len(block.Transactions), len(blockSigsOnly.Signatures))
}

j.parser.ExpectTxs(j.slotNumber, len(block.Transactions))

for idx, trx := range block.Transactions {
detail.trxIdx = idx
if len(blockSigsOnly.Signatures)-1 <= idx {
Expand All @@ -130,14 +134,15 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev
var logIdx uint
for _, outputs := range parseProgramLogs(messages) {
for _, event := range outputs.Events {
logIdx++

event.BlockNumber = detail.blockNumber
event.SlotNumber = detail.slotNumber
event.BlockHeight = detail.blockHeight
event.BlockHash = detail.blockHash
event.TransactionHash = detail.trxSig
event.TransactionIndex = detail.trxIdx
event.TransactionLogIndex = logIdx

logIdx++

chJobs <- &processEventJob{
parser: parser,
event: event,
Expand Down
182 changes: 177 additions & 5 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package logpoller
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -40,7 +44,7 @@ type EncodedLogCollector struct {

// dependencies and configuration
client RPCClient
parser ProgramEventProcessor
parser *orderedParser
lggr logger.Logger
rpcTimeLimit time.Duration

Expand All @@ -62,7 +66,7 @@ func NewEncodedLogCollector(
) *EncodedLogCollector {
c := &EncodedLogCollector{
client: client,
parser: parser,
parser: newOrderedParser(parser),
chSlot: make(chan uint64),
chBlock: make(chan uint64, 1),
chJobs: make(chan Job, 1),
Expand Down Expand Up @@ -201,10 +205,15 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) {
continue
}

from := c.highestSlot.Load() + 1
if c.highestSlot.Load() == 0 {
from = slot
}

c.highestSlot.Store(slot)

// load blocks in slot range
c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot)
c.loadRange(ctx, from, slot)
}
}
}
Expand All @@ -214,9 +223,9 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
select {
case <-ctx.Done():
return
case block := <-c.chBlock:
case slot := <-c.chBlock:
if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{
slotNumber: block,
slotNumber: slot,
client: c.client,
parser: c.parser,
chJobs: c.chJobs,
Expand Down Expand Up @@ -270,6 +279,7 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en
}

for _, block := range result {
c.parser.ExpectBlock(block)
select {
case <-ctx.Done():
return nil
Expand All @@ -279,3 +289,165 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en

return nil
}

type orderedParser struct {
parser ProgramEventProcessor
mu sync.Mutex
blocks []uint64
ready []uint64
expect map[uint64]int
actual map[uint64][]ProgramEvent
}

func newOrderedParser(parser ProgramEventProcessor) *orderedParser {
return &orderedParser{
parser: parser,
blocks: make([]uint64, 0),
ready: make([]uint64, 0),
expect: make(map[uint64]int),
actual: make(map[uint64][]ProgramEvent),
}
}

func (p *orderedParser) ExpectBlock(block uint64) {
p.mu.Lock()
defer p.mu.Unlock()

p.blocks = append(p.blocks, block)

// ensure sort ascending
sort.Slice(p.blocks, func(i, j int) bool {
return p.blocks[i] < p.blocks[j]
})
}

func (p *orderedParser) ExpectTxs(block uint64, quantity int) {
p.mu.Lock()
defer p.mu.Unlock()

p.expect[block] = quantity
p.actual[block] = make([]ProgramEvent, 0, quantity)
}

func (p *orderedParser) Process(event ProgramEvent) error {
p.mu.Lock()
defer p.mu.Unlock()

meetsExpectations, err := p.addAndCompareExpectations(event)
if err != nil {
return err
}

// incoming event does not meet expectations for transaction
// event is added to actual and no error is returned
if !meetsExpectations {
return nil
}

p.clearEmptyBlocks()
p.setReady(event.SlotNumber)

return p.sendReadySlots()
}

func (p *orderedParser) addAndCompareExpectations(evt ProgramEvent) (bool, error) {
expectations, ok := p.expect[evt.SlotNumber]
if !ok {
return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

evts, ok := p.actual[evt.SlotNumber]
if !ok {
return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

p.actual[evt.SlotNumber] = append(evts, evt)

return expectations == len(evts)+1, nil
}

func (p *orderedParser) clearEmptyBlocks() {
rmvIdx := make([]int, 0)

for idx, block := range p.blocks {
exp, ok := p.expect[block]
if !ok {
// transaction expectations have not been applied for this block yet
continue
}

if exp == 0 {
rmvIdx = append(rmvIdx, idx)

delete(p.expect, block)
delete(p.actual, block)
}
}

for count, idx := range rmvIdx {
p.blocks = remove(p.blocks, idx-count)
}
}

func (p *orderedParser) setReady(slot uint64) {
p.ready = append(p.ready, slot)
}

func (p *orderedParser) sendReadySlots() error {
rmvIdx := make([]int, 0)

// start at the lowest block and find ready blocks
for idx, block := range p.blocks {
// to ensure ordered delivery, break from the loop if a ready block isn't found
// this function should be preceded by clearEmptyBlocks
rIdx, ok := getIdx(p.ready, block)
if !ok {
return nil
}

evts, ok := p.actual[block]
if !ok {
return errors.New("invalid state")
}

var errs error
for _, evt := range evts {
errs = errors.Join(errs, p.parser.Process(evt))
}

// need possible retry
if errs != nil {
return errs
}

p.ready = remove(p.ready, rIdx)
rmvIdx = append(rmvIdx, idx)

delete(p.expect, block)
delete(p.actual, block)
}

for count, idx := range rmvIdx {
p.blocks = remove(p.blocks, idx-count)
}

return nil
}

func getIdx[T any](slice []T, match T) (int, bool) {
for idx, value := range slice {
if reflect.DeepEqual(value, match) {
return idx, true
}
}

return -1, false
}

func remove[T any](slice []T, s int) []T {
return append(slice[:s], slice[s+1:]...)
}

var (
errExpectationsNotSet = errors.New("expectations not set")
)
Loading

0 comments on commit 72089b2

Please sign in to comment.