Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example xdrill usage #308

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 20 additions & 156 deletions internal/transform/contract_events.go
Original file line number Diff line number Diff line change
@@ -1,169 +1,33 @@
package transform

import (
"encoding/base64"
"encoding/json"
"fmt"

"github.com/stellar/stellar-etl/internal/toid"
"github.com/stellar/stellar-etl/internal/utils"
"github.com/stellar/stellar-etl/internal/xdr2json"

"github.com/stellar/go/ingest"
"github.com/stellar/go/strkey"
diagnosticevent "github.com/stellar/go/ingest/diagnostic_event"

Check failure on line 5 in internal/transform/contract_events.go

View workflow job for this annotation

GitHub Actions / build

no required module provides package github.com/stellar/go/ingest/diagnostic_event; to add it:
"github.com/stellar/go/ingest/ledger"

Check failure on line 6 in internal/transform/contract_events.go

View workflow job for this annotation

GitHub Actions / build

no required module provides package github.com/stellar/go/ingest/ledger; to add it:
"github.com/stellar/go/xdr"
)

// TransformContractEvent converts a transaction's contract events and diagnostic events into a form suitable for BigQuery.
// It is known that contract events are a subset of the diagnostic events XDR definition. We are opting to call all of these events
// contract events for better clarity to data analytics users.
func TransformContractEvent(transaction ingest.LedgerTransaction, lhe xdr.LedgerHeaderHistoryEntry) ([]ContractEventOutput, error) {
ledgerHeader := lhe.Header
outputTransactionHash := utils.HashToHexString(transaction.Result.TransactionHash)
outputLedgerSequence := uint32(ledgerHeader.LedgerSeq)

transactionIndex := uint32(transaction.Index)

outputTransactionID := toid.New(int32(outputLedgerSequence), int32(transactionIndex), 0).ToInt64()

outputCloseTime, err := utils.TimePointToUTCTimeStamp(ledgerHeader.ScpValue.CloseTime)
if err != nil {
return []ContractEventOutput{}, fmt.Errorf("for ledger %d; transaction %d (transaction id=%d): %v", outputLedgerSequence, transactionIndex, outputTransactionID, err)
}

// GetDiagnosticEvents will return all contract events and diagnostic events emitted
contractEvents, err := transaction.GetDiagnosticEvents()
if err != nil {
return []ContractEventOutput{}, err
}

var transformedContractEvents []ContractEventOutput

for _, contractEvent := range contractEvents {
var outputContractId string
var outputTopicsJson []interface{}
var outputTopicsDecodedJson []interface{}

outputInSuccessfulContractCall := contractEvent.InSuccessfulContractCall
event := contractEvent.Event
outputType := event.Type
outputTypeString := event.Type.String()

eventTopics := getEventTopics(event.Body)
outputTopics, outputTopicsDecoded, err := serializeScValArray(eventTopics)
if err != nil {
return []ContractEventOutput{}, err
}
outputTopicsJson = outputTopics
outputTopicsDecodedJson = outputTopicsDecoded

eventData := getEventData(event.Body)
outputData, outputDataDecoded, err := serializeScVal(eventData)
if err != nil {
return []ContractEventOutput{}, err
}

// Convert the xdrContactId to string
// TODO: https://stellarorg.atlassian.net/browse/HUBBLE-386 this should be a stellar/go/xdr function
if event.ContractId != nil {
contractId := *event.ContractId
contractIdByte, _ := contractId.MarshalBinary()
outputContractId, _ = strkey.Encode(strkey.VersionByteContract, contractIdByte)
}

outputContractEventXDR, err := xdr.MarshalBase64(contractEvent)
if err != nil {
return []ContractEventOutput{}, err
}

outputTransactionID := toid.New(int32(outputLedgerSequence), int32(transactionIndex), 0).ToInt64()
outputSuccessful := transaction.Result.Successful()

transformedDiagnosticEvent := ContractEventOutput{
TransactionHash: outputTransactionHash,
TransactionID: outputTransactionID,
Successful: outputSuccessful,
LedgerSequence: outputLedgerSequence,
ClosedAt: outputCloseTime,
InSuccessfulContractCall: outputInSuccessfulContractCall,
ContractId: outputContractId,
Type: int32(outputType),
TypeString: outputTypeString,
Topics: outputTopicsJson,
TopicsDecoded: outputTopicsDecodedJson,
Data: outputData,
DataDecoded: outputDataDecoded,
ContractEventXDR: outputContractEventXDR,
}

transformedContractEvents = append(transformedContractEvents, transformedDiagnosticEvent)
}

return transformedContractEvents, nil
}

// TODO this should be a stellar/go/xdr function
func getEventTopics(eventBody xdr.ContractEventBody) []xdr.ScVal {
switch eventBody.V {
case 0:
contractEventV0 := eventBody.MustV0()
return contractEventV0.Topics
default:
panic("unsupported event body version: " + string(eventBody.V))
}
}

// TODO this should be a stellar/go/xdr function
func getEventData(eventBody xdr.ContractEventBody) xdr.ScVal {
switch eventBody.V {
case 0:
contractEventV0 := eventBody.MustV0()
return contractEventV0.Data
default:
panic("unsupported event body version: " + string(eventBody.V))
}
}

// TODO this should also be used in the operations processor
func serializeScVal(scVal xdr.ScVal) (interface{}, interface{}, error) {
var serializedData, serializedDataDecoded interface{}
serializedData = "n/a"
serializedDataDecoded = "n/a"

if _, ok := scVal.ArmForSwitch(int32(scVal.Type)); ok {
var err error
var raw []byte
var jsonMessage json.RawMessage
raw, err = scVal.MarshalBinary()
if err != nil {
return nil, nil, err
}

serializedData = base64.StdEncoding.EncodeToString(raw)
jsonMessage, err = xdr2json.ConvertBytes(xdr.ScVal{}, raw)
if err != nil {
return nil, nil, err
}

serializedDataDecoded = jsonMessage
}

return serializedData, serializedDataDecoded, nil
}

// TODO this should also be used in the operations processor
func serializeScValArray(scVals []xdr.ScVal) ([]interface{}, []interface{}, error) {
data := make([]interface{}, 0, len(scVals))
dataDecoded := make([]interface{}, 0, len(scVals))

for _, scVal := range scVals {
serializedData, serializedDataDecoded, err := serializeScVal(scVal)
if err != nil {
return nil, nil, err
}
data = append(data, serializedData)
dataDecoded = append(dataDecoded, serializedDataDecoded)
func TransformContractEvent(event xdr.DiagnosticEvent, transaction ingest.LedgerTransaction) (ContractEventOutput, error) {
outputTransactionHash, _ := xdr.MarshalBase64(transaction.Hash)
outputContractID, _, _ := diagnosticevent.ContractID(event)
outputTopics, _ := diagnosticevent.Topics(event)
outputData, _ := diagnosticevent.Topics(event)

transformedDiagnosticEvent := ContractEventOutput{
TransactionHash: outputTransactionHash,
TransactionID: transaction.ID(),
Successful: transaction.Successful(),
LedgerSequence: ledger.Sequence(transaction.Ledger),
ClosedAt: ledger.ClosedAt(transaction.Ledger),
InSuccessfulContractCall: diagnosticevent.Successful(event),
ContractId: outputContractID,
Type: diagnosticevent.Type(event),
TopicsDecoded: outputTopics,
DataDecoded: outputData,
}

return data, dataDecoded, nil
return transformedDiagnosticEvent, nil
}
Loading
Loading