Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt inx-mqtt to iota 2.0 #100

Merged
merged 30 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1ebfe8a
Start updating to iota-core inx and inx-app
jkrvivian Aug 31, 2023
f1a418b
Update block related apis
jkrvivian Sep 1, 2023
bbb0c62
Update UTXO apis
jkrvivian Sep 1, 2023
d4eb5f9
Add go.work to gitignore
jkrvivian Sep 1, 2023
b4e2a1d
Implement commitment events
jkrvivian Sep 4, 2023
2da0c34
Minor fixes
jkrvivian Sep 4, 2023
51a34b9
Fix UnwrapBlock in PublishBlock
jkrvivian Sep 7, 2023
897c0ab
Remove unused method
jkrvivian Sep 7, 2023
c6bd36c
Remove block finalized and add block accepted event
jkrvivian Sep 7, 2023
b2bbc3e
Upgrade to mqtt/v2
jkrvivian Sep 7, 2023
c6cd623
Update go.mod version
jkrvivian Sep 7, 2023
2926654
Implement Commitments topic
jkrvivian Sep 7, 2023
82a286f
Fix bugs
jkrvivian Sep 7, 2023
7b189d3
Update inx, inx-app version
jkrvivian Sep 7, 2023
99cd14f
Fix typo
jkrvivian Sep 8, 2023
7172c4e
Rename PublishCommitmentOnTopic to PublishRawCommitmentOnTopic
jkrvivian Sep 8, 2023
c1c427f
Remove comment
jkrvivian Sep 8, 2023
a83cff8
Remove useless check in PublishOutputMetadata
jkrvivian Sep 8, 2023
ac575a1
Update comments of commitment topics
jkrvivian Sep 8, 2023
603835e
Return on error
jkrvivian Sep 8, 2023
549d9ca
Log error
jkrvivian Sep 8, 2023
4202852
Fix linter
jkrvivian Sep 8, 2023
1bff02e
Update publishFinalizedCommitmentInfoTopic
jkrvivian Sep 8, 2023
2f714c3
Remove unused ledgerIndex from payloadForOutput
jkrvivian Sep 12, 2023
9a1778a
Remove unused LedgerOutput
jkrvivian Sep 12, 2023
ebf06a1
Check for outputmetadata subscriber in PublishOutputMetadata
jkrvivian Sep 18, 2023
bcce1d5
Include OutputMetadata in output response
jkrvivian Sep 19, 2023
6bd3838
Remove unused parameter
jkrvivian Sep 19, 2023
128155c
Move JSONEncode to payloadForOutput
jkrvivian Sep 20, 2023
2da220a
Change order of parameters
muXxer Sep 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 34 additions & 54 deletions components/mqtt/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,34 @@
}
}

func payloadForOutput(api iotago.API, iotaOutput iotago.Output) *outputPayload {
rawOutputJSON, err := api.JSONEncode(iotaOutput)
if err != nil {
return nil
}

rawRawOutputJSON := json.RawMessage(rawOutputJSON)
func payloadForOutput(ledgerIndex iotago.SlotIndex, output *inx.LedgerOutput, iotaOutputJSON []byte) *outputPayload {
outputID := output.GetOutputId().Unwrap()
rawRawOutputJSON := json.RawMessage(iotaOutputJSON)
muXxer marked this conversation as resolved.
Show resolved Hide resolved

return &outputPayload{
Metadata: &outputMetadataPayload{
BlockID: output.GetBlockId().Unwrap().ToHex(),
TransactionID: outputID.TransactionID().ToHex(),
Spent: false,
OutputIndex: outputID.Index(),
IncludedSlot: output.GetSlotBooked(),
IncludedCommitmentID: output.GetCommitmentIdIncluded().Unwrap().ToHex(),
LedgerIndex: uint64(ledgerIndex),
},
RawOutput: &rawRawOutputJSON,
}
}

func payloadForSpent(api iotago.API, iotaOutput iotago.Output) *outputPayload {
return payloadForOutput(api, iotaOutput)
func payloadForSpent(api iotago.API, ledgerIndex iotago.SlotIndex, spent *inx.LedgerSpent, iotaOutputJSON []byte) *outputPayload {

Check failure on line 178 in components/mqtt/publish.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] components/mqtt/publish.go#L178

unused-parameter: parameter 'api' seems to be unused, consider removing or renaming it as _ (revive)
Raw output
components/mqtt/publish.go:178:22: unused-parameter: parameter 'api' seems to be unused, consider removing or renaming it as _ (revive)
func payloadForSpent(api iotago.API, ledgerIndex iotago.SlotIndex, spent *inx.LedgerSpent, iotaOutputJSON []byte) *outputPayload {
                     ^
payload := payloadForOutput(ledgerIndex, spent.GetOutput(), iotaOutputJSON)
if payload != nil {
payload.Metadata.Spent = true
payload.Metadata.SpentSlot = spent.GetSlotSpent()
payload.Metadata.CommitmentIDSpent = spent.GetCommitmentIdSpent().Unwrap().ToHex()
payload.Metadata.TransactionIDSpent = spent.UnwrapTransactionIDSpent().ToHex()
}

return payload
}

func (s *Server) PublishOnUnlockConditionTopics(baseTopic string, output iotago.Output, payloadFunc func() interface{}) {
Expand Down Expand Up @@ -269,7 +282,7 @@
}
}

func (s *Server) PublishOutput(ctx context.Context, output *inx.LedgerOutput, publishOnAllTopics bool) {
func (s *Server) PublishOutput(ctx context.Context, ledgerIndex iotago.SlotIndex, output *inx.LedgerOutput, publishOnAllTopics bool) {
api := s.NodeBridge.APIProvider().CurrentAPI()
iotaOutput, err := output.UnwrapOutput(api)
if err != nil {
Expand All @@ -279,7 +292,11 @@
var payload *outputPayload
payloadFunc := func() interface{} {
if payload == nil {
payload = payloadForOutput(api, iotaOutput)
rawOutputJSON, err := api.JSONEncode(iotaOutput)
muXxer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil
}
payload = payloadForOutput(ledgerIndex, output, rawOutputJSON)
}

return payload
Expand All @@ -306,34 +323,7 @@
}
}

func (s *Server) PublishOutputMetadata(outputID iotago.OutputID, metadata *inx.OutputMetadata) {
outputMetadataTopic := strings.ReplaceAll(topicOutputsMetadata, parameterOutputID, outputID.ToHex())
hasOutputMetadataTopicSubscriber := s.MQTTBroker.HasSubscribers(outputMetadataTopic)

if !hasOutputMetadataTopicSubscriber {
return
}

response := &outputMetadataPayload{
BlockID: metadata.GetBlockId().Unwrap().ToHex(),
TransactionID: metadata.GetTransactionId().Unwrap().ToHex(),
OutputIndex: uint16(metadata.GetOutputIndex()),
Spent: metadata.GetIsSpent(),
CommitmentIDSpent: metadata.GetCommitmentIdSpent().Unwrap().ToHex(),
TransactionIDSpent: metadata.GetTransactionIdSpent().Unwrap().ToHex(),
IncludedCommitmentID: metadata.GetCommitmentIdIncluded().Unwrap().ToHex(),
}

// Serialize here instead of using publishOnTopic to avoid double JSON marshaling
jsonPayload, err := json.Marshal(response)
if err != nil {
return
}

s.sendMessageOnTopic(outputMetadataTopic, jsonPayload)
}

func (s *Server) PublishSpent(spent *inx.LedgerSpent) {
func (s *Server) PublishSpent(ledgerIndex iotago.SlotIndex, spent *inx.LedgerSpent) {
api := s.NodeBridge.APIProvider().CurrentAPI()
iotaOutput, err := spent.GetOutput().UnwrapOutput(api)
if err != nil {
Expand All @@ -343,7 +333,11 @@
var payload *outputPayload
payloadFunc := func() interface{} {
if payload == nil {
payload = payloadForSpent(api, iotaOutput)
rawOutputJSON, err := api.JSONEncode(iotaOutput)
muXxer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil
}
payload = payloadForSpent(api, ledgerIndex, spent, rawOutputJSON)
}

return payload
Expand Down Expand Up @@ -398,17 +392,3 @@

return emptyOutputID
}

func outputIDFromOutputMetadataTopic(topic string) iotago.OutputID {
if strings.HasPrefix(topic, "output-metadata/") {
outputIDHex := strings.Replace(topic, "output-metadata/", "", 1)
outputID, err := iotago.OutputIDFromHex(outputIDHex)
if err != nil {
return emptyOutputID
}

return outputID
}

return emptyOutputID
}
30 changes: 10 additions & 20 deletions components/mqtt/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,6 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st
if outputID := outputIDFromOutputsTopic(topic); outputID != emptyOutputID {
go s.fetchAndPublishOutput(ctx, outputID)
}

case strings.HasPrefix(topic, "output-metadata/"):
if outputID := outputIDFromOutputMetadataTopic(topic); outputID != emptyOutputID {
go s.fetchAndPublishOutputMetadata(ctx, outputID)
}
}
}
}
Expand Down Expand Up @@ -369,12 +364,12 @@ func (s *Server) listenToConfirmedBlocks(ctx context.Context) error {
}

func (s *Server) listenToLedgerUpdates(ctx context.Context) error {

stream, err := s.NodeBridge.Client().ListenToLedgerUpdates(ctx, &inx.SlotRangeRequest{})
if err != nil {
return err
}

var latestIndex iotago.SlotIndex
for {
payload, err := stream.Recv()
if err != nil {
Expand All @@ -389,13 +384,19 @@ func (s *Server) listenToLedgerUpdates(ctx context.Context) error {
}

switch op := payload.GetOp().(type) {
//nolint:nosnakecase // grpc uses underscores
case *inx.LedgerUpdate_BatchMarker:
if op.BatchMarker.GetMarkerType() == inx.LedgerUpdate_Marker_BEGIN {
latestIndex = iotago.SlotIndex(op.BatchMarker.Slot)
}

//nolint:nosnakecase // grpc uses underscores
case *inx.LedgerUpdate_Consumed:
s.PublishSpent(op.Consumed)
s.PublishSpent(latestIndex, op.Consumed)

//nolint:nosnakecase // grpc uses underscores
case *inx.LedgerUpdate_Created:
s.PublishOutput(ctx, op.Created, true)
s.PublishOutput(ctx, latestIndex, op.Created, true)
}
}

Expand Down Expand Up @@ -462,18 +463,7 @@ func (s *Server) fetchAndPublishOutput(ctx context.Context, outputID iotago.Outp

return
}
s.PublishOutput(ctx, resp.GetOutput(), false)
}

func (s *Server) fetchAndPublishOutputMetadata(ctx context.Context, outputID iotago.OutputID) {
s.LogDebugf("fetchAndPublishOutputMetadata: %s", outputID.ToHex())
resp, err := s.NodeBridge.Client().ReadOutputMetadata(ctx, inx.NewOutputId(outputID))
if err != nil {
s.LogErrorf("failed to retrieve output metadata %s :%v", outputID.ToHex(), err)

return
}
s.PublishOutputMetadata(outputID, resp)
s.PublishOutput(ctx, resp.GetLatestCommitmentId().Unwrap().Index(), resp.GetOutput(), false)
}

func (s *Server) fetchAndPublishTransactionInclusion(ctx context.Context, transactionID iotago.TransactionID) {
Expand Down
2 changes: 0 additions & 2 deletions components/mqtt/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ const (
topicFoundryOutputs = "outputs/foundry/" + parameterFoundryID // outputPayload
topicOutputsByUnlockConditionAndAddress = "outputs/unlock/" + parameterCondition + "/" + parameterAddress // outputPayload
topicSpentOutputsByUnlockConditionAndAddress = "outputs/unlock/" + parameterCondition + "/" + parameterAddress + "/spent" // outputPayload

topicOutputsMetadata = "output-metadata/" + parameterOutputID // outputMetadataPayload
)

type unlockCondition string
Expand Down
10 changes: 8 additions & 2 deletions components/mqtt/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,24 @@ type outputMetadataPayload struct {
OutputIndex uint16 `json:"outputIndex"`
// Whether this output is spent.
Spent bool `json:"isSpent"`
// The slot index at which the output was spent.
SpentSlot uint64 `json:"spentSlot,omitempty"`
// The commitment ID at which this output was spent.
CommitmentIDSpent string `json:"commitmentIdSpent,omitempty"`
// The transaction this output was spent with.
TransactionIDSpent string `json:"transactionIdSpent,omitempty"`
// The slot index at which the output was booked.
IncludedSlot uint64 `json:"includedSlot,omitempty"`
// The commitment ID at which this output was booked into the ledger.
IncludedCommitmentID string `json:"includedCommitmentId"`
// The latest commitment ID of the node.
LatestCommitmentID string `json:"latestCommitmentId"`
// The current ledger index of the node.
LedgerIndex uint64 `json:"ledgerIndex"`
}

// outputPayload defines the payload of the output topics.
type outputPayload struct {
// The metadata of the output.
Metadata *outputMetadataPayload `json:"metadata"`
// The output in its serialized form.
RawOutput *json.RawMessage `json:"output"`
}
42 changes: 21 additions & 21 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ go 1.21
replace github.com/mochi-co/mqtt => github.com/alexsporn/mqtt v0.0.0-20220909140721-d60c438960a4

require (
github.com/iotaledger/hive.go/app v0.0.0-20230829152614-7afc7a4d89b3
github.com/iotaledger/hive.go/lo v0.0.0-20230829152614-7afc7a4d89b3
github.com/iotaledger/hive.go/logger v0.0.0-20230829152614-7afc7a4d89b3
github.com/iotaledger/hive.go/web v0.0.0-20230629181801-64c530ff9d15
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230908143946-e15613b4af95
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230908142450-d259cfb4153d
github.com/iotaledger/hive.go/app v0.0.0-20230912172434-dc477e1f5140
github.com/iotaledger/hive.go/lo v0.0.0-20230912172434-dc477e1f5140
github.com/iotaledger/hive.go/logger v0.0.0-20230912172434-dc477e1f5140
github.com/iotaledger/hive.go/web v0.0.0-20230912172434-dc477e1f5140
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230919065227-618931c246c5
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230918132810-48814818bff9
github.com/iotaledger/iota.go/v4 v4.0.0-20230829160021-46cad51e89d1
github.com/labstack/echo/v4 v4.11.1
github.com/mochi-co/mqtt v1.3.2
Expand All @@ -27,7 +27,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/ethereum/go-ethereum v1.12.2 // indirect
github.com/ethereum/go-ethereum v1.13.1 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/felixge/fgprof v0.9.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand All @@ -42,14 +42,14 @@ require (
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/iancoleman/orderedmap v0.3.0 // indirect
github.com/iotaledger/hive.go/constraints v0.0.0-20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/crypto v0.0.0-20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/ds v0.0.0-20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/ierrors v0.0.0-20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/runtime v0.0.0-20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/stringify v0.0.0-20230829152614-7afc7a4d89b3 // indirect
github.com/iotaledger/hive.go/constraints v0.0.0-20230912172434-dc477e1f5140 // indirect
github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20230912172434-dc477e1f5140 // indirect
github.com/iotaledger/hive.go/crypto v0.0.0-20230912172434-dc477e1f5140 // indirect
github.com/iotaledger/hive.go/ds v0.0.0-20230912172434-dc477e1f5140 // indirect
github.com/iotaledger/hive.go/ierrors v0.0.0-20230912172434-dc477e1f5140 // indirect
github.com/iotaledger/hive.go/runtime v0.0.0-20230912172434-dc477e1f5140 // indirect
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20230912172434-dc477e1f5140 // indirect
github.com/iotaledger/hive.go/stringify v0.0.0-20230912172434-dc477e1f5140 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/labstack/gommon v0.4.0 // indirect
Expand All @@ -62,7 +62,7 @@ require (
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/petermattis/goid v0.0.0-20230808133559-b036b712a89b // indirect
github.com/petermattis/goid v0.0.0-20230904192822-1876fd5063bc // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
Expand All @@ -75,12 +75,12 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
Loading
Loading