Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return cids for gsfa #162

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
_site
/.cargo
/target

.zed
112 changes: 112 additions & 0 deletions gsfa/gsfa-read-multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
Expand Down Expand Up @@ -86,6 +87,7 @@ epochLoop:
}

type EpochToTransactionObjects map[uint64][]*ipldbindcode.Transaction
type EpochToTransactionCids map[uint64][]cid.Cid

// Count returns the number of signatures in the EpochToSignatures.
func (e EpochToTransactionObjects) Count() int {
Expand All @@ -96,6 +98,15 @@ func (e EpochToTransactionObjects) Count() int {
return count
}

// Count returns the number of signatures in the EpochToSignatures.
func (e EpochToTransactionCids) Count() int {
var count int
for _, sigs := range e {
count += len(sigs)
}
return count
}

func (multi *GsfaReaderMultiepoch) GetBeforeUntil(
ctx context.Context,
pk solana.PublicKey,
Expand Down Expand Up @@ -200,3 +211,104 @@ epochLoop:
}
return transactions, nil
}

func (multi *GsfaReaderMultiepoch) GetCids(
ctx context.Context,
pk solana.PublicKey,
limit int,
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error),
) (EpochToTransactionCids, error) {
if limit <= 0 {
return make(EpochToTransactionCids), nil
}
return multi.iterBeforeUntilCids(ctx, pk, limit, fetcher)
}

// GetBeforeUntil gets the signatures for the given public key,
// before the given slot.
func (multi *GsfaReaderMultiepoch) iterBeforeUntilCids(
ctx context.Context,
pk solana.PublicKey,
limit int,
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error),
) (EpochToTransactionCids, error) {
if limit <= 0 {
return make(EpochToTransactionCids), nil
}

transactions := make(EpochToTransactionCids)
// reachedBefore := false
// if before == nil {
// reachedBefore = true
// }

epochLoop:
for readerIndex, index := range multi.epochs {
if ctx.Err() != nil {
return nil, ctx.Err()
}
epochNum, ok := index.GetEpoch()
if !ok {
return nil, fmt.Errorf("epoch is not set for the #%d provided gsfa reader", readerIndex)
}

locsStartedAt := time.Now()
locs, err := index.offsets.Get(pk)
if err != nil {
if compactindexsized.IsNotFound(err) {
continue epochLoop
}
return nil, fmt.Errorf("error while getting initial offset: %w", err)
}
klog.V(5).Infof("locs.OffsetToFirst took %s", time.Since(locsStartedAt))
debugln("locs.OffsetToFirst:", locs)

next := locs // Start from the latest, and go back in time.

for {
if next == nil || next.IsZero() { // no previous.
continue epochLoop
}
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
startedReadAt := time.Now()
locations, newNext, err := index.ll.ReadWithSize(next.Offset, next.Size)
if err != nil {
return nil, fmt.Errorf("error while reading linked log with next=%v: %w", next, err)
}
klog.V(5).Infof("ReadWithSize took %s to get %d locs", time.Since(startedReadAt), len(locations))
if len(locations) == 0 {
continue epochLoop
}
debugln("sigIndexes:", locations, "newNext:", newNext)
next = &newNext
for _, txLoc := range locations {
tx, err := fetcher(epochNum, txLoc)
if err != nil {
return nil, fmt.Errorf("error while getting signature at index=%v: %w", txLoc, err)
}
// sig, err := tx.Signature()
// if err != nil {
// return nil, fmt.Errorf("error while getting signature: %w", err)
// }
// klog.V(5).Infoln(locIndex, "sig:", sig, "epoch:", epochNum)
// if !reachedBefore && sig == *before {
// reachedBefore = true
// continue
// }
// if !reachedBefore {
// continue
// }
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
transactions[epochNum] = append(transactions[epochNum], tx)
// if until != nil && sig == *until {
// break epochLoop
// }
}
}
}
return transactions, nil
}
146 changes: 146 additions & 0 deletions multiepoch-getSignaturesForAddressCids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package main

import (
"context"
"fmt"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/sourcegraph/jsonrpc2"
)

func countTransactionCids(v gsfa.EpochToTransactionCids) int {
var count int
for _, txs := range v {
count += len(txs)
}
return count
}

func (multi *MultiEpoch) handleGetSignaturesForAddressCids(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) (*jsonrpc2.Error, error) {
// - parse and validate request
// - get list of epochs (from most recent to oldest)
// - iterate until we find the requested number of signatures
// - expand the signatures with tx data
signaturesOnly := multi.options.GsfaOnlySignatures

params, err := parseGetSignaturesForAddressParams(req.Params)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidParams,
Message: "Invalid params",
}, fmt.Errorf("failed to parse params: %v", err)
}
pk := params.Address
limit := params.Limit

gsfaIndexes, _ := multi.getGsfaReadersInEpochDescendingOrder()
if len(gsfaIndexes) == 0 {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "getSignaturesForAddress method is not enabled",
}, fmt.Errorf("no gsfa indexes found")
}

gsfaMulti, err := gsfa.NewGsfaReaderMultiepoch(gsfaIndexes)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to create gsfa multiepoch reader: %w", err)
}

// Get the transactions:
foundTransactions, err := gsfaMulti.GetCids(
ctx,
pk,
limit,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error) {
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return cid.Cid{}, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
}
raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{
Offset: oas.Offset,
Size: oas.Size,
})
if err != nil {
return cid.Cid{}, fmt.Errorf("failed to get signature: %w", err)
}
cb := cid.V1Builder{MhLength: -1, MhType: uint64(multicodec.Sha2_256), Codec: uint64(multicodec.DagCbor)}
c, err := cb.Sum(raw)
if err != nil {
return cid.Cid{}, fmt.Errorf("failed to get cid: %w", err)
}
return c, nil
},
)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to get signatures: %w", err)
}

if len(foundTransactions) == 0 {
err = conn.ReplyRaw(
ctx,
req.ID,
[]map[string]any{},
)
if err != nil {
return nil, fmt.Errorf("failed to reply: %w", err)
}
return nil, nil
}

// The response is an array of objects: [{sigCid: string}]
response := make([]map[string]any, countTransactionCids(foundTransactions))
numBefore := 0
for ei := range foundTransactions {
epoch := ei
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to get epoch %d: %w", epoch, err)
}

sigs := foundTransactions[ei]
for i := range sigs {
ii := numBefore + i
c := sigs[i]
err := func() error {
response[ii] = map[string]any{
"sigCid": c.String(),
}
if signaturesOnly {
return nil
}

return nil
}()
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to get tx data: %w", err)
}
}
numBefore += len(sigs)
}
// reply with the data
err = conn.ReplyRaw(
ctx,
req.ID,
response,
)
if err != nil {
return nil, fmt.Errorf("failed to reply: %w", err)
}

return nil, nil
}
2 changes: 2 additions & 0 deletions multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ func (ser *MultiEpoch) handleRequest(ctx context.Context, conn *requestContext,
return ser.handleGetFirstAvailableBlock(ctx, conn, req)
case "getSlot":
return ser.handleGetSlot(ctx, conn, req)
case "getSignaturesForAddressCids":
return ser.handleGetSignaturesForAddressCids(ctx, conn, req)
default:
return &jsonrpc2.Error{
Code: jsonrpc2.CodeMethodNotFound,
Expand Down
Loading