Skip to content

Commit

Permalink
Merge pull request #75 from icon-project/feat/store_messages_in_db_be…
Browse files Browse the repository at this point in the history
…fore_relaying

Store messages in db before pushing it to the message cache
  • Loading branch information
debendraoli authored Jan 9, 2024
2 parents f6ebd85 + 760a87d commit 26223fc
Showing 1 changed file with 23 additions and 3 deletions.
26 changes: 23 additions & 3 deletions relayer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,32 @@ func (r *Relayer) processMessages(ctx context.Context) {
// & merge message to src cache
func (r *Relayer) processBlockInfo(ctx context.Context, srcChainRuntime *ChainRuntime, blockInfo types.BlockInfo) {
srcChainRuntime.LastBlockHeight = blockInfo.Height
err := r.SaveBlockHeight(ctx, srcChainRuntime, blockInfo.Height, len(blockInfo.Messages))
if err != nil {

if len(blockInfo.Messages) > 0 {
for msg := range r.getMessageStreamAfterSavingToDB(blockInfo.Messages) {
srcChainRuntime.MessageCache.Add(types.NewRouteMessage(msg))
}
}

if err := r.SaveBlockHeight(ctx, srcChainRuntime, blockInfo.Height, len(blockInfo.Messages)); err != nil {
r.log.Error("unable to save height", zap.Error(err))
}
}

func (r *Relayer) getMessageStreamAfterSavingToDB(messages []*types.Message) <-chan *types.Message {
msgStream := make(chan *types.Message)

go func(msgList []*types.Message) {
defer close(msgStream)
for _, msg := range msgList {
if err := r.messageStore.StoreMessage(types.NewRouteMessage(msg)); err != nil {
r.log.Error(fmt.Sprintf("failed to store a message in db: %v", err))
}
msgStream <- msg
}
}(messages)

go srcChainRuntime.mergeMessages(ctx, blockInfo.Messages)
return msgStream
}

func (r *Relayer) SaveBlockHeight(ctx context.Context, chainRuntime *ChainRuntime, height uint64, messageCount int) error {
Expand Down

0 comments on commit 26223fc

Please sign in to comment.