diff --git a/.goreleaser.yaml b/.goreleaser.yaml index d2fcf5c8..f4b61d41 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -58,7 +58,7 @@ builds: - -mod=readonly - -trimpath ldflags: - - -s -w -X github.com/icon-project/centralized-relay/cmd.Version={{ .Tag }} + - -s -w -X github.com/icon-project/centralized-relay/relayer.Version={{ .Tag }} - -linkmode=external - -extldflags '-Wl,-z,muldefs -lm' tags: @@ -79,7 +79,7 @@ builds: - -mod=readonly - -trimpath ldflags: - - -s -w -X github.com/icon-project/centralized-relay/cmd.Version={{ .Tag }} + - -s -w -X github.com/icon-project/centralized-relay/relayer.Version={{ .Tag }} - -linkmode=external - -extldflags "-static" - -extldflags '-Wl,-z,muldefs -lm' @@ -96,7 +96,7 @@ archives: - linux-arm64 name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}" format: tar.gz - wrap_in_directory: false + wrap_in_directory: true checksum: name_template: SHA256SUMS-{{.Version}}.txt @@ -104,4 +104,4 @@ checksum: release: prerelease: auto - draft: false + draft: true diff --git a/Makefile b/Makefile index 60bc0e80..60ed959b 100644 --- a/Makefile +++ b/Makefile @@ -46,9 +46,6 @@ e2e-test: test-all: @go test -v ./... -test-all: - @go test -v ./... - PACKAGE_NAME := github.com/icon-project/centralized-relay GOLANG_CROSS_VERSION ?= v1.22.4 LIBWASM_VERSION ?= v2.1.0 @@ -78,4 +75,4 @@ release: -v `pwd`:/go/src/$(PACKAGE_NAME) \ -w /go/src/$(PACKAGE_NAME) \ goreleaser/goreleaser-cross:${GOLANG_CROSS_VERSION} \ - release --clean \ No newline at end of file + release --clean diff --git a/cmd/db.go b/cmd/db.go index 5b3f126d..36c6994c 100644 --- a/cmd/db.go +++ b/cmd/db.go @@ -10,17 +10,19 @@ import ( "github.com/icon-project/centralized-relay/relayer" "github.com/icon-project/centralized-relay/relayer/lvldb" "github.com/icon-project/centralized-relay/relayer/socket" - "github.com/icon-project/centralized-relay/relayer/store" "github.com/spf13/cobra" ) type dbState struct { - chain string - height uint64 - sn uint64 - page uint - limit uint - server *socket.Server + chain string + height uint64 + sn uint64 + txHash string + page uint + limit uint + server *socket.Server + fromHeight uint64 + toHeight uint64 } func newDBState() *dbState { @@ -89,19 +91,17 @@ func (d *dbState) messagesList(app *appState) *cobra.Command { return err } defer client.Close() - pg := store.NewPagination().WithPage(d.page, d.limit) - messages, err := client.GetMessageList(d.chain, pg) + messages, err := client.GetMessageList(d.chain, d.limit) if err != nil { return err } printLabels("Sn", "Src", "Dst", "Height", "Event", "Retry") // Print messages - for _, msg := range messages.Messages { + for _, msg := range messages.Message { fmt.Printf("%-10d %-10s %-10s %-10d %-10s %-10d \n", msg.Sn, msg.Src, msg.Dst, msg.MessageHeight, msg.EventType, msg.Retry) } - return nil }, } @@ -122,18 +122,20 @@ func (d *dbState) messagesRelay(app *appState) *cobra.Command { if err != nil { return err } - result, err := client.RelayMessage(d.chain, d.height, new(big.Int).SetUint64(d.sn)) + messages, err := client.RelayMessage(d.chain, d.height, d.txHash) if err != nil { return err } - printLabels("Sn", "Src", "Dst", "Height", "Event", "Retry") - printValues(result.Sn, result.Src, result.Dst, result.MessageHeight, result.EventType, result.Retry) + printLabels("Sn", "Src", "Dst", "Height", "Event") + for _, msg := range messages { + printValues(msg.Sn, msg.Src, msg.Dst, msg.MessageHeight, msg.EventType) + } return nil }, } - d.messageMsgIDFlag(rly, true) d.messageChainFlag(rly, true) d.messageHeightFlag(rly) + d.messageTxHashFlag(rly) return rly } @@ -178,6 +180,10 @@ func (d *dbState) messageHeightFlag(cmd *cobra.Command) { cmd.Flags().Uint64Var(&d.height, "height", 0, "block height") } +func (d *dbState) messageTxHashFlag(cmd *cobra.Command) { + cmd.Flags().StringVarP(&d.txHash, "tx_hash", "t", "", "tx hash") +} + func (d *dbState) messageChainFlag(cmd *cobra.Command, markRequired bool) { cmd.Flags().StringVarP(&d.chain, "chain", "c", "", "message chain to select") if markRequired { @@ -221,7 +227,7 @@ func (d *dbState) blockInfo(app *appState) *cobra.Command { } printLabels("NID", "Height") for _, block := range blocks { - printValues(block.Chain, block.Height) + printValues(block.Chain, block.CheckPointHeight) } return nil }, @@ -284,19 +290,10 @@ func printLabels(labels ...any) { } func printValues(values ...any) { - padStr := `%-10s` - padInt := `%-10d` + padStr := `%-10v` var valueCell string - for _, val := range values { - if _, ok := val.(string); ok { - valueCell += padStr + " " - } else if _, ok := val.(int); ok { - valueCell += padInt + " " - } else if _, ok := val.(uint); ok { - valueCell += padInt + " " - } else if _, ok := val.(uint64); ok { - valueCell += padInt + " " - } + for range values { + valueCell += padStr + " " } valueCell += "\n" fmt.Printf(valueCell, values...) diff --git a/cmd/debug.go b/cmd/debug.go new file mode 100644 index 00000000..c905dcd4 --- /dev/null +++ b/cmd/debug.go @@ -0,0 +1,138 @@ +package cmd + +import ( + "encoding/hex" + "fmt" + "strings" + + "github.com/spf13/cobra" +) + +type DebugState struct { + *dbState + app *appState + chain string + fromHeight uint64 + toHeight uint64 +} + +func newDebugState(a *appState) *DebugState { + db := newDBState() + return &DebugState{ + app: a, + dbState: db, + } +} + +func debugCmd(a *appState) *cobra.Command { + state := newDebugState(a) + debug := &cobra.Command{ + Use: "debug", + Short: "Commands for troubleshooting the relayer", + Aliases: []string{"dbg"}, + Example: strings.TrimSpace(fmt.Sprintf(`$ %s dbg [command]`, appName)), + } + + heightCmd := &cobra.Command{ + Use: "height", + Short: "Get latest height of the chain", + } + heightCmd.AddCommand(state.getLatestHeight(a)) + + blockCmd := &cobra.Command{ + Use: "block", + Short: "Get latest processed block of the chain", + } + blockCmd.AddCommand(state.getLatestProcessedBlock(a)) + + queryCmd := &cobra.Command{ + Use: "query", + Short: "Query block range", + RunE: func(cmd *cobra.Command, args []string) error { + client, err := state.getSocket(a) + if err != nil { + return err + } + defer client.Close() + if state.server != nil { + defer state.server.Close() + } + res, err := client.QueryBlockRange(state.chain, state.fromHeight, state.toHeight) + if err != nil { + return err + } + printLabels("Chain", "Sn", "Event Type", "height", "data") + for _, msg := range res.Msgs { + printValues(state.chain, msg.Sn.Text(10), msg.EventType, msg.MessageHeight, hex.EncodeToString(msg.Data)) + } + return nil + }, + } + queryCmd.Flags().StringVar(&state.chain, "chain", "", "Chain ID") + queryCmd.Flags().Uint64Var(&state.fromHeight, "from_height", 0, "From Height") + queryCmd.Flags().Uint64Var(&state.toHeight, "to_height", 0, "To height") + queryCmd.MarkFlagsRequiredTogether("chain", "from_height", "to_height") + debug.AddCommand(heightCmd, blockCmd, queryCmd) + + return debug +} + +func (c *DebugState) getLatestHeight(app *appState) *cobra.Command { + getLatestHeight := &cobra.Command{ + Use: "get", + Short: "Get the latest chain height", + Aliases: []string{"g"}, + Example: strings.TrimSpace(fmt.Sprintf(`$ %s dbg height get --chain [chain-id]`, appName)), + RunE: func(cmd *cobra.Command, args []string) error { + client, err := c.getSocket(app) + if err != nil { + return err + } + defer client.Close() + if c.server != nil { + defer c.server.Close() + } + res, err := client.GetLatestHeight(c.chain) + if err != nil { + return err + } + printLabels("Chain", "Latest Chain Height") + printValues(c.chain, res.Height) + return nil + }, + } + getLatestHeight.Flags().StringVar(&c.chain, "chain", "", "Chain ID") + getLatestHeight.MarkFlagRequired("chain") + return getLatestHeight +} + +func (c *DebugState) getLatestProcessedBlock(app *appState) *cobra.Command { + getLatestHeight := &cobra.Command{ + Use: "get", + Short: "Get the last processed block height", + Aliases: []string{"g"}, + Example: strings.TrimSpace(fmt.Sprintf(`$ %s dbg block get --chain [chain-id]`, appName)), + RunE: func(cmd *cobra.Command, args []string) error { + client, err := c.getSocket(app) + if err != nil { + return err + } + defer client.Close() + if c.server != nil { + defer c.server.Close() + } + res, err := client.GetBlock(c.chain) + if err != nil { + fmt.Println(err) + return err + } + printLabels("Chain", "Last Processed Block") + for _, block := range res { + printValues(block.Chain, block.CheckPointHeight) + } + return nil + }, + } + getLatestHeight.Flags().StringVar(&c.chain, "chain", "", "Chain ID") + return getLatestHeight +} diff --git a/cmd/root.go b/cmd/root.go index 6c59a737..b7c773bf 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -11,6 +11,7 @@ import ( "runtime/debug" "time" + "github.com/icon-project/centralized-relay/relayer" zaplogfmt "github.com/jsternberg/zap-logfmt" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -35,7 +36,6 @@ var ( }() defaultDBName = "data" defaultConfig = "config.yaml" - Version = "dev" ) // Execute adds all child commands to the root command and sets flags appropriately. @@ -99,7 +99,7 @@ func NewRootCmd(log *zap.Logger) *cobra.Command { Use: appName, Short: "This application makes data relay between chains!", Long: `Use this to relay xcall packet between chains using bridge contract.`, - Version: Version, + Version: relayer.Version, Aliases: []string{"crly"}, } @@ -159,6 +159,7 @@ func NewRootCmd(log *zap.Logger) *cobra.Command { dbCmd(a), keystoreCmd(a), contractCMD(a), + debugCmd(a), ) return rootCmd } diff --git a/cmd/start.go b/cmd/start.go index 8d2d3be5..5e220040 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -24,7 +24,7 @@ func startCmd(a *appState) *cobra.Command { $ %s start # start all the registered chains `, appName)), RunE: func(cmd *cobra.Command, args []string) error { - a.log.Info("Starting relayer", zap.String("version", Version)) + a.log.Info("Starting relayer", zap.String("version", relayer.Version)) chains := a.config.Chains.GetAll() flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval) diff --git a/relayer/chains/evm/listener.go b/relayer/chains/evm/listener.go index a1946166..a96407b6 100644 --- a/relayer/chains/evm/listener.go +++ b/relayer/chains/evm/listener.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum" ethTypes "github.com/ethereum/go-ethereum/core/types" - "github.com/icon-project/centralized-relay/relayer/chains/evm/types" relayertypes "github.com/icon-project/centralized-relay/relayer/types" "github.com/pkg/errors" ) @@ -169,22 +168,19 @@ func (p *Provider) isConnectionError(err error) bool { strings.Contains(err.Error(), "websocket") } -func (p *Provider) FindMessages(ctx context.Context, lbn *types.BlockNotification) ([]*relayertypes.Message, error) { - if lbn == nil && lbn.Logs == nil { - return nil, nil - } +func (p *Provider) FindMessages(ctx context.Context, logs []ethTypes.Log) ([]*relayertypes.Message, error) { var messages []*relayertypes.Message - for _, log := range lbn.Logs { + for _, log := range logs { message, err := p.getRelayMessageFromLog(log) if err != nil { return nil, err } p.log.Info("Detected eventlog", - zap.String("dst", message.Dst), zap.Uint64("sn", message.Sn.Uint64()), zap.Any("req_id", message.ReqID), zap.String("event_type", message.EventType), zap.String("tx_hash", log.TxHash.String()), + zap.String("target_network", message.Dst), zap.Uint64("height", log.BlockNumber), ) messages = append(messages, message) diff --git a/relayer/chains/evm/provider.go b/relayer/chains/evm/provider.go index 3e13f289..0a5d0b86 100644 --- a/relayer/chains/evm/provider.go +++ b/relayer/chains/evm/provider.go @@ -100,6 +100,10 @@ func (p *Config) NewProvider(ctx context.Context, log *zap.Logger, homepath stri }, nil } +func (p *Provider) GetLastProcessedBlockHeight(ctx context.Context) (uint64, error) { + return p.GetLastSavedBlockHeight(), nil +} + func (p *Provider) NID() string { return p.cfg.NID } @@ -479,3 +483,31 @@ func (p *Provider) SetLastSavedHeightFunc(f func() uint64) { func (p *Provider) GetLastSavedBlockHeight() uint64 { return p.LastSavedHeightFunc() } + +func (p *Provider) QueryBlockMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*providerTypes.Message, error) { + var messages []*providerTypes.Message + filter := ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(fromHeight), + ToBlock: new(big.Int).SetUint64(toHeight), + Addresses: p.blockReq.Addresses, + Topics: p.blockReq.Topics, + } + p.log.Info("queryting", zap.Uint64("start", fromHeight), zap.Uint64("end", toHeight)) + logs, _ := p.getLogsRetry(ctx, filter) + for _, log := range logs { + message, err := p.getRelayMessageFromLog(log) + if err != nil { + p.log.Error("failed to get relay message from log", zap.Error(err)) + continue + } + p.log.Info("Found eventlog", + zap.String("target_network", message.Dst), + zap.Uint64("sn", message.Sn.Uint64()), + zap.String("event_type", message.EventType), + zap.String("tx_hash", log.TxHash.String()), + zap.Uint64("block_number", log.BlockNumber), + ) + messages = append(messages, message) + } + return messages, nil +} diff --git a/relayer/chains/evm/query.go b/relayer/chains/evm/query.go index 2450567b..b2fea4e4 100644 --- a/relayer/chains/evm/query.go +++ b/relayer/chains/evm/query.go @@ -5,12 +5,13 @@ import ( "fmt" "math/big" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" - provider "github.com/icon-project/centralized-relay/relayer/chains/evm/types" "github.com/icon-project/centralized-relay/relayer/events" "github.com/icon-project/centralized-relay/relayer/types" + "go.uber.org/zap" ) func (p *Provider) QueryLatestHeight(ctx context.Context) (uint64, error) { @@ -49,18 +50,33 @@ func (p *Provider) QueryBalance(ctx context.Context, addr string) (*types.Coin, } // TODO: may not be need anytime soon so its ok to implement later on -func (p *Provider) GenerateMessages(ctx context.Context, key *types.MessageKeyWithMessageHeight) ([]*types.Message, error) { - header, err := p.client.GetHeaderByHeight(ctx, new(big.Int).SetUint64(key.Height)) +func (p *Provider) GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*types.Message, error) { + p.log.Info("generating message", zap.Uint64("fromHeight", fromHeight), zap.Uint64("toHeight", toHeight)) + p.blockReq.FromBlock = new(big.Int).SetUint64(fromHeight) + p.blockReq.ToBlock = new(big.Int).SetUint64(toHeight) + logs, err := p.client.FilterLogs(ctx, p.blockReq) if err != nil { return nil, err } - p.blockReq.FromBlock = new(big.Int).SetUint64(key.Height) - p.blockReq.ToBlock = new(big.Int).SetUint64(key.Height) - logs, err := p.client.FilterLogs(ctx, p.blockReq) + return p.FindMessages(ctx, logs) +} + +func (p *Provider) FetchTxMessages(ctx context.Context, txHash string) ([]*types.Message, error) { + txReceipt, err := p.client.TransactionReceipt(ctx, common.HexToHash(txHash)) + if err != nil { + return nil, err + } + filter := ethereum.FilterQuery{ + FromBlock: txReceipt.BlockNumber, + ToBlock: txReceipt.BlockNumber, + Addresses: p.blockReq.Addresses, + Topics: p.blockReq.Topics, + } + logs, err := p.client.FilterLogs(ctx, filter) if err != nil { return nil, err } - return p.FindMessages(ctx, &provider.BlockNotification{Height: new(big.Int).SetUint64(key.Height), Header: header, Logs: logs, Hash: header.Hash()}) + return p.FindMessages(ctx, logs) } func (p *Provider) QueryTransactionReceipt(ctx context.Context, txHash string) (*types.Receipt, error) { diff --git a/relayer/chains/icon/event_parse.go b/relayer/chains/icon/event_parse.go index 81e52fc6..4e11c31d 100644 --- a/relayer/chains/icon/event_parse.go +++ b/relayer/chains/icon/event_parse.go @@ -93,30 +93,29 @@ func (p *Provider) parseMessageEvent(notifications *types.EventNotification) ([] var messages []*providerTypes.Message for _, event := range notifications.Logs { - switch event.Indexed[0] { - case EmitMessage: - msg, err := p.parseEmitMessageEvent(height.Uint64(), event) - if err != nil { - return nil, err - } - messages = append(messages, msg) - case CallMessage: - msg, err := p.parseCallMessageEvent(height.Uint64(), event) - if err != nil { - return nil, err - } - messages = append(messages, msg) - case RollbackMessage: - msg, err := p.parseRollbackMessageEvent(height.Uint64(), event) - if err != nil { - return nil, err - } + msg, err := p.parseMessageFromEventLog(height.Uint64(), event) + if err != nil { + p.log.Warn("received invalid event", zap.Error(err)) + } else if msg != nil { messages = append(messages, msg) } } return messages, nil } +func (p *Provider) parseMessageFromEventLog(height uint64, event *types.EventNotificationLog) (*providerTypes.Message, error) { + switch event.Indexed[0] { + case EmitMessage: + return p.parseEmitMessageEvent(height, event) + case CallMessage: + return p.parseCallMessageEvent(height, event) + case RollbackMessage: + return p.parseRollbackMessageEvent(height, event) + default: + return nil, fmt.Errorf("invalid event: %s", event.Indexed[0]) + } +} + // parseEmitMessage parses EmitMessage event func (p *Provider) parseEmitMessageEvent(height uint64, e *types.EventNotificationLog) (*providerTypes.Message, error) { if indexdedLen, dataLen := len(e.Indexed), len(e.Data); indexdedLen != 3 && dataLen != 1 { diff --git a/relayer/chains/icon/provider.go b/relayer/chains/icon/provider.go index c35c8d72..579f3cf5 100644 --- a/relayer/chains/icon/provider.go +++ b/relayer/chains/icon/provider.go @@ -4,13 +4,16 @@ import ( "context" "fmt" "math/big" + "time" + "github.com/gorilla/websocket" "github.com/icon-project/centralized-relay/relayer/chains/icon/types" "github.com/icon-project/centralized-relay/relayer/events" "github.com/icon-project/centralized-relay/relayer/kms" "github.com/icon-project/centralized-relay/relayer/provider" providerTypes "github.com/icon-project/centralized-relay/relayer/types" "github.com/icon-project/goloop/module" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -95,6 +98,57 @@ type Provider struct { LastSavedHeightFunc func() uint64 } +func (p *Provider) GetLastProcessedBlockHeight(ctx context.Context) (uint64, error) { + return p.GetLastSavedBlockHeight(), nil +} + +func (p *Provider) QueryBlockMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*providerTypes.Message, error) { + var messages []*providerTypes.Message + eventReq := &types.EventRequest{ + Height: types.NewHexInt(int64(fromHeight)), + EventFilter: p.GetMonitorEventFilters(), + Logs: types.NewHexInt(1), + ProgressInterval: types.NewHexInt(25), + } + ctxMonitorBlock, cancelMonitorBlock := context.WithTimeout(ctx, 5*time.Second) + defer cancelMonitorBlock() + err := p.client.MonitorEvent(ctxMonitorBlock, eventReq, nil, func(v *types.EventNotification, outgoing chan *providerTypes.BlockInfo) error { + if !errors.Is(ctx.Err(), context.Canceled) { + msgs, err := p.parseMessageEvent(v) + if err != nil { + p.log.Error("failed to parse message event", zap.Error(err)) + return err + } + shouldBreak := false + for _, msg := range msgs { + if msg.MessageHeight > toHeight { + shouldBreak = true + } else { + p.log.Info("Found eventlog", + zap.Uint64("height", msg.MessageHeight), + zap.String("target_network", msg.Dst), + zap.Uint64("sn", msg.Sn.Uint64()), + zap.String("tx_hash", v.Hash.String()), + zap.String("event_type", msg.EventType), + ) + messages = append(messages, msg) + } + } + if shouldBreak { + cancelMonitorBlock() + } + } + return nil + }, func(conn *websocket.Conn, err error) {}) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return messages, nil + } + return nil, err + } + return messages, nil +} + func (p *Provider) NID() string { return p.cfg.NID } diff --git a/relayer/chains/icon/query.go b/relayer/chains/icon/query.go index 03123b47..7fdb202e 100644 --- a/relayer/chains/icon/query.go +++ b/relayer/chains/icon/query.go @@ -2,9 +2,8 @@ package icon import ( "context" - "errors" "fmt" - "strings" + "slices" "github.com/icon-project/centralized-relay/relayer/chains/icon/types" providerTypes "github.com/icon-project/centralized-relay/relayer/types" @@ -66,113 +65,44 @@ func (ip *Provider) QueryBalance(ctx context.Context, addr string) (*providerTyp return providerTypes.NewCoin("ICX", balance.Uint64()), nil } -func (p *Provider) GenerateMessages(ctx context.Context, key *providerTypes.MessageKeyWithMessageHeight) ([]*providerTypes.Message, error) { - p.log.Info("generating message", zap.Any("messagekey", key)) - if key == nil { - return nil, errors.New("GenerateMessage: message key cannot be nil") - } +func (p *Provider) GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*providerTypes.Message, error) { + p.log.Info("generating message", zap.Uint64("fromHeight", fromHeight), zap.Uint64("toHeight", toHeight)) + return p.QueryBlockMessages(ctx, fromHeight, toHeight) +} - block, err := p.client.GetBlockByHeight(&types.BlockHeightParam{ - Height: types.NewHexInt(int64(key.Height)), +func (p *Provider) FetchTxMessages(ctx context.Context, txHash string) ([]*providerTypes.Message, error) { + txResult, err := p.client.GetTransactionResult(&types.TransactionHashParam{ + Hash: types.HexBytes(txHash), }) if err != nil { - return nil, fmt.Errorf("GenerateMessage:GetBlockByHeight %v", err) + return nil, err } - var messages []*providerTypes.Message - - for _, res := range block.NormalTransactions { - txResult, err := p.client.GetTransactionResult(&types.TransactionHashParam{Hash: res.TxHash}) - if err != nil { - return nil, fmt.Errorf("GenerateMessage:GetTransactionResult %v", err) - } - - for _, el := range txResult.EventLogs { - var ( - dst string - eventType = p.GetEventName(el.Indexed[0]) - ) - height, err := txResult.BlockHeight.BigInt() + connectionContract := types.Address(p.cfg.Contracts[providerTypes.ConnectionContract]) + xcallContract := types.Address(p.cfg.Contracts[providerTypes.XcallContract]) + allowedAddresses := []types.Address{connectionContract, xcallContract} + + messages := []*providerTypes.Message{} + for _, log := range txResult.EventLogs { + if slices.Contains(allowedAddresses, log.Addr) { + event := types.EventNotificationLog{ + Address: log.Addr, + Indexed: log.Indexed, + Data: log.Data, + } + height, err := txResult.BlockHeight.Int64() if err != nil { - return nil, fmt.Errorf("GenerateMessage: bigIntConversion %v", err) + return nil, err } - switch el.Indexed[0] { - case EmitMessage: - if el.Addr != types.Address(p.cfg.Contracts[providerTypes.ConnectionContract]) || len(el.Indexed) != 3 || len(el.Data) != 1 { - continue - } - dst = el.Indexed[1] - sn, err := types.HexInt(el.Indexed[2]).BigInt() - if err != nil { - p.log.Error("GenerateMessage: error decoding int value ") - continue - } - data := types.HexBytes(el.Data[0]) - dataValue, err := data.Value() - if err != nil { - p.log.Error("GenerateMessage: error decoding data ", zap.Error(err)) - continue - } - msg := &providerTypes.Message{ - MessageHeight: height.Uint64(), - EventType: eventType, - Dst: dst, - Src: key.Src, - Data: dataValue, - Sn: sn, - } - messages = append(messages, msg) - case CallMessage: - if el.Addr != types.Address(p.cfg.Contracts[providerTypes.XcallContract]) || len(el.Indexed) != 4 || len(el.Data) != 2 { - continue - } - dst = p.NID() - src := strings.SplitN(string(el.Indexed[1][:]), "/", 2) - sn, err := types.HexInt(el.Indexed[3]).BigInt() - if err != nil { - return nil, fmt.Errorf("failed to parse sn: %s", el.Indexed[2]) - } - requestID, err := types.HexInt(el.Data[0]).BigInt() - if err != nil { - return nil, fmt.Errorf("failed to parse reqID: %s", el.Data[0]) - } - data, err := types.HexBytes(el.Data[1]).Value() - if err != nil { - p.log.Error("GenerateMessage: error decoding data ", zap.Error(err)) - continue - } - msg := &providerTypes.Message{ - MessageHeight: height.Uint64(), - EventType: p.GetEventName(el.Indexed[0]), - Dst: dst, - Src: src[0], - Data: data, - Sn: sn, - ReqID: requestID, - } - messages = append(messages, msg) - case RollbackMessage: - if el.Addr != types.Address(p.cfg.Contracts[providerTypes.XcallContract]) || len(el.Indexed) != 2 { - continue - } - sn, err := types.HexInt(el.Indexed[1]).BigInt() - if err != nil { - return nil, fmt.Errorf("failed to parse sn: %s", el.Indexed[1]) - } - msg := &providerTypes.Message{ - MessageHeight: height.Uint64(), - EventType: p.GetEventName(el.Indexed[0]), - Dst: p.NID(), - Src: p.NID(), - Sn: sn, - } + msg, err := p.parseMessageFromEventLog(uint64(height), &event) + if err != nil { + p.log.Warn("received invalid event", zap.Error(err)) + } else if msg != nil { messages = append(messages, msg) } } } - if len(messages) == 0 { - return nil, errors.New("GenerateMessage: no messages found") - } + return messages, nil } diff --git a/relayer/chains/mockchain/provider.go b/relayer/chains/mockchain/provider.go index 49424bb2..6a3a5d9f 100644 --- a/relayer/chains/mockchain/provider.go +++ b/relayer/chains/mockchain/provider.go @@ -51,6 +51,10 @@ func (pp *MockProviderConfig) GetWallet() string { func (pp *MockProviderConfig) SetWallet(string) { } +func (pp *MockProviderConfig) ContractsAddress() types.ContractConfigMap { + return nil +} + type MockProvider struct { log *zap.Logger PCfg *MockProviderConfig @@ -161,7 +165,15 @@ func (p *MockProvider) QueryTransactionReceipt(ctx context.Context, txHash strin return nil, nil } -func (ip *MockProvider) GenerateMessages(ctx context.Context, messageKey *types.MessageKeyWithMessageHeight) ([]*types.Message, error) { +func (ip *MockProvider) GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*types.Message, error) { + return nil, nil +} + +func (ip *MockProvider) FetchTxMessages(ctx context.Context, txHash string) ([]*types.Message, error) { + return nil, nil +} + +func (ip *MockProvider) GenerateTxMessages(ctx context.Context, txHash string) ([]*types.Message, error) { return nil, nil } @@ -206,3 +218,11 @@ func (p *MockProvider) SetFee(context.Context, string, *big.Int, *big.Int) error func (p *MockProvider) SetLastSavedHeightFunc(func() uint64) { } + +func (p *MockProvider) GetLastProcessedBlockHeight(ctx context.Context) (uint64, error) { + return 0, nil +} + +func (p *MockProvider) QueryBlockMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*types.Message, error) { + return nil, nil +} diff --git a/relayer/chains/solana/config.go b/relayer/chains/solana/config.go index 091e2859..7bbfb261 100644 --- a/relayer/chains/solana/config.go +++ b/relayer/chains/solana/config.go @@ -9,6 +9,7 @@ import ( solrpc "github.com/gagliardetto/solana-go/rpc" "github.com/icon-project/centralized-relay/relayer/chains/solana/types" "github.com/icon-project/centralized-relay/relayer/provider" + relayertypes "github.com/icon-project/centralized-relay/relayer/types" "go.uber.org/zap" ) @@ -92,3 +93,15 @@ func (pc *Config) Validate() error { func (pc *Config) Enabled() bool { return !pc.Disabled } + +func (pc *Config) ContractsAddress() relayertypes.ContractConfigMap { + addresses := relayertypes.ContractConfigMap{ + relayertypes.ConnectionContract: pc.ConnectionProgram, + relayertypes.XcallContract: pc.XcallProgram, + } + for _, dapp := range pc.Dapps { + addresses[dapp.Name] = dapp.ProgramID + } + + return addresses +} diff --git a/relayer/chains/solana/provider.go b/relayer/chains/solana/provider.go index 81ee988a..2379c1fe 100644 --- a/relayer/chains/solana/provider.go +++ b/relayer/chains/solana/provider.go @@ -81,38 +81,58 @@ func (p *Provider) FinalityBlock(ctx context.Context) uint64 { return 0 } -func (p *Provider) GenerateMessages(ctx context.Context, messageKey *relayertypes.MessageKeyWithMessageHeight) ([]*relayertypes.Message, error) { - blockRes, err := p.client.GetBlock(ctx, messageKey.Height) +func (p *Provider) FetchTxMessages(ctx context.Context, txHash string) ([]*relayertypes.Message, error) { + signature := solana.MustSignatureFromBase58(txHash) + txVersion := uint64(0) + txn, err := p.client.GetTransaction(ctx, signature, &solrpc.GetTransactionOpts{MaxSupportedTransactionVersion: &txVersion}) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get txn with sign %s: %w", signature, err) } - messages := []*relayertypes.Message{} + event := types.SolEvent{ + Slot: txn.Slot, + Signature: signature, + Logs: txn.Meta.LogMessages, + } - for _, txn := range blockRes.Transactions { - event := types.SolEvent{ - Slot: txn.Slot, - Signature: txn.MustGetTransaction().Signatures[0], - Logs: txn.Meta.LogMessages, - } + return p.parseMessagesFromEvent(event) +} - messages, err := p.parseMessagesFromEvent(event) +func (p *Provider) GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*relayertypes.Message, error) { + var messages []*relayertypes.Message + + for h := fromHeight; h <= toHeight; h++ { + blockRes, err := p.client.GetBlock(ctx, h) if err != nil { - return nil, fmt.Errorf("failed to parse messages from event [%+v]: %w", event, err) + return nil, err } - for _, msg := range messages { - p.log.Info("Detected event log: ", - zap.Uint64("height", msg.MessageHeight), - zap.String("event-type", msg.EventType), - zap.Any("sn", msg.Sn), - zap.Any("req-id", msg.ReqID), - zap.String("src", msg.Src), - zap.String("dst", msg.Dst), - zap.Any("data", hex.EncodeToString(msg.Data)), - ) - messages = append(messages, msg) + + for _, txn := range blockRes.Transactions { + event := types.SolEvent{ + Slot: txn.Slot, + Signature: txn.MustGetTransaction().Signatures[0], + Logs: txn.Meta.LogMessages, + } + + msgs, err := p.parseMessagesFromEvent(event) + if err != nil { + return nil, fmt.Errorf("failed to parse messages from event [%+v]: %w", event, err) + } + for _, msg := range msgs { + p.log.Info("Detected event log: ", + zap.Uint64("height", msg.MessageHeight), + zap.String("event-type", msg.EventType), + zap.Any("sn", msg.Sn), + zap.Any("req-id", msg.ReqID), + zap.String("src", msg.Src), + zap.String("dst", msg.Dst), + zap.Any("data", hex.EncodeToString(msg.Data)), + ) + messages = append(messages, msgs...) + } } } + return messages, nil } diff --git a/relayer/chains/steller/client.go b/relayer/chains/steller/client.go index ef7c2c71..e61829b2 100644 --- a/relayer/chains/steller/client.go +++ b/relayer/chains/steller/client.go @@ -26,6 +26,7 @@ type IClient interface { FetchEvents(ctx context.Context, eventFilter types.EventFilter) ([]types.Event, error) + ParseTxnEvents(txn *horizon.Transaction, fl types.EventFilter) ([]types.Event, error) LedgerDetail(sequence uint32) (ledger horizon.Ledger, err error) Transactions(request horizonclient.TransactionRequest) (txs horizon.TransactionsPage, err error) } @@ -77,33 +78,44 @@ func (cl *Client) FetchEvents(ctx context.Context, eventFilter types.EventFilter return nil, err } - var events []types.Event + var allEvents []types.Event for _, txn := range txnPage.Embedded.Records { - var txnMeta xdr.TransactionMeta - if err := xdr.SafeUnmarshalBase64(txn.ResultMetaXdr, &txnMeta); err != nil { - return nil, err + events, err := cl.ParseTxnEvents(&txn, eventFilter) + if err != nil { + return allEvents, err + } + allEvents = append(allEvents, events...) + } + + return allEvents, nil +} + +func (cl *Client) ParseTxnEvents(txn *horizon.Transaction, fl types.EventFilter) ([]types.Event, error) { + var events []types.Event + var txnMeta xdr.TransactionMeta + if err := xdr.SafeUnmarshalBase64(txn.ResultMetaXdr, &txnMeta); err != nil { + return nil, err + } + if txnMeta.V3 == nil || txnMeta.V3.SorobanMeta == nil { + return events, nil + } + for _, ev := range txnMeta.V3.SorobanMeta.Events { + hexBytes, err := hex.DecodeString(ev.ContractId.HexString()) + if err != nil { + break } - if txnMeta.V3 == nil || txnMeta.V3.SorobanMeta == nil { - continue + contractID, err := strkey.Encode(strkey.VersionByteContract, hexBytes) + if err != nil { + return nil, err } - for _, ev := range txnMeta.V3.SorobanMeta.Events { - hexBytes, err := hex.DecodeString(ev.ContractId.HexString()) - if err != nil { - break - } - contractID, err := strkey.Encode(strkey.VersionByteContract, hexBytes) - if err != nil { - return nil, err - } - if slices.Contains(eventFilter.ContractIds, contractID) { - for _, topic := range ev.Body.V0.Topics { - if slices.Contains(eventFilter.Topics, topic.String()) { - events = append(events, types.Event{ - ContractEvent: &ev, - LedgerSeq: uint64(txn.Ledger), - }) - break - } + if slices.Contains(fl.ContractIds, contractID) { + for _, topic := range ev.Body.V0.Topics { + if slices.Contains(fl.Topics, topic.String()) { + events = append(events, types.Event{ + ContractEvent: &ev, + LedgerSeq: uint64(txn.Ledger), + }) + break } } } diff --git a/relayer/chains/steller/config.go b/relayer/chains/steller/config.go index 9ea92f9b..537807bd 100644 --- a/relayer/chains/steller/config.go +++ b/relayer/chains/steller/config.go @@ -9,6 +9,7 @@ import ( "github.com/icon-project/centralized-relay/relayer/chains/steller/sorobanclient" "github.com/icon-project/centralized-relay/relayer/provider" + "github.com/icon-project/centralized-relay/relayer/types" relayertypes "github.com/icon-project/centralized-relay/relayer/types" "github.com/stellar/go/clients/horizonclient" "go.uber.org/zap" @@ -77,3 +78,7 @@ func (pc *Config) Validate() error { func (pc *Config) Enabled() bool { return !pc.Disabled } + +func (pc *Config) ContractsAddress() types.ContractConfigMap { + return pc.Contracts +} diff --git a/relayer/chains/steller/provider.go b/relayer/chains/steller/provider.go index 01589a87..26d14f94 100644 --- a/relayer/chains/steller/provider.go +++ b/relayer/chains/steller/provider.go @@ -2,7 +2,7 @@ package steller import ( "context" - "errors" + "fmt" "math/big" "strconv" "sync" @@ -66,17 +66,32 @@ func (p *Provider) FinalityBlock(ctx context.Context) uint64 { return 0 } -func (p *Provider) GenerateMessages(ctx context.Context, messageKey *relayertypes.MessageKeyWithMessageHeight) ([]*relayertypes.Message, error) { - p.log.Info("generating message", zap.Any("messagekey", messageKey)) - if messageKey == nil { - return nil, errors.New("GenerateMessage: message key cannot be nil") +func (p *Provider) GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*relayertypes.Message, error) { + var messages []*relayertypes.Message + + for h := fromHeight; h <= toHeight; h++ { + msgs, err := p.fetchLedgerMessages(context.Background(), h) + if err != nil { + return nil, fmt.Errorf("failed to generate message for height %d: %w", h, err) + } + messages = append(messages, msgs...) + } + + return messages, nil +} + +func (p *Provider) FetchTxMessages(ctx context.Context, txHash string) ([]*relayertypes.Message, error) { + tx, err := p.client.GetTransaction(txHash) + if err != nil { + return nil, err } - messages, err := p.fetchLedgerMessages(context.Background(), messageKey.Height) - if len(messages) == 0 { - return nil, errors.New("GenerateMessage: no messages found") + + events, err := p.client.ParseTxnEvents(&tx, p.getEventFilter(0)) + if err != nil { + return nil, err } - return messages, err + return p.parseMessagesFromEvents(events) } func (p *Provider) SetAdmin(ctx context.Context, admin string) error { diff --git a/relayer/chains/sui/config.go b/relayer/chains/sui/config.go index 6bf28c06..6159dcc7 100644 --- a/relayer/chains/sui/config.go +++ b/relayer/chains/sui/config.go @@ -2,11 +2,13 @@ package sui import ( "context" + "encoding/json" "sync" "time" suisdkClient "github.com/coming-chat/go-sui/v2/client" "github.com/icon-project/centralized-relay/relayer/provider" + "github.com/icon-project/centralized-relay/relayer/types" "go.uber.org/zap" ) @@ -91,3 +93,15 @@ func (pc *Config) Validate() error { func (c *Config) Enabled() bool { return !c.Disabled } + +func (c *Config) ContractsAddress() types.ContractConfigMap { + dapps, _ := json.Marshal(c.Dapps) + + return types.ContractConfigMap{ + "xcall-package-id": c.XcallPkgID, + "xcall-storage-id": c.XcallStorageID, + "connection-id": c.ConnectionID, + "connection-cap-id": c.ConnectionCapID, + "dapps": string(dapps), + } +} diff --git a/relayer/chains/sui/provider.go b/relayer/chains/sui/provider.go index 72d7c574..70950490 100644 --- a/relayer/chains/sui/provider.go +++ b/relayer/chains/sui/provider.go @@ -97,18 +97,50 @@ func (p *Provider) FinalityBlock(ctx context.Context) uint64 { return 0 } -func (p *Provider) GenerateMessages(ctx context.Context, messageKey *relayertypes.MessageKeyWithMessageHeight) ([]*relayertypes.Message, error) { - checkpoint, err := p.client.GetCheckpoint(ctx, messageKey.Height) - if err != nil { - p.log.Error("failed to fetch checkpoint", zap.Error(err)) - return nil, err +func (p *Provider) GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*relayertypes.Message, error) { + var messages []*relayertypes.Message + for h := fromHeight; h <= toHeight; h++ { + checkpoint, err := p.client.GetCheckpoint(ctx, h) + if err != nil { + p.log.Error("failed to fetch checkpoint", zap.Error(err)) + return nil, err + } + + eventResponse, err := p.client.GetEventsFromTxBlocks( + ctx, + checkpoint.Transactions, + func(stbr *suitypes.SuiTransactionBlockResponse) bool { + for _, t := range stbr.ObjectChanges { + if t.Data.Mutated != nil && t.Data.Mutated.ObjectId.String() == p.cfg.XcallStorageID { + return true + } + } + return false + }, + ) + if err != nil { + p.log.Error("failed to query events", zap.Error(err)) + return nil, err + } + + blockInfoList, err := p.parseMessagesFromEvents(eventResponse) + if err != nil { + p.log.Error("failed to parse messages from events", zap.Error(err)) + return nil, err + } + + for _, bi := range blockInfoList { + messages = append(messages, bi.Messages...) + } } - var messages []*relayertypes.Message + return messages, nil +} +func (p *Provider) FetchTxMessages(ctx context.Context, txHash string) ([]*relayertypes.Message, error) { eventResponse, err := p.client.GetEventsFromTxBlocks( ctx, - checkpoint.Transactions, + []string{txHash}, func(stbr *suitypes.SuiTransactionBlockResponse) bool { for _, t := range stbr.ObjectChanges { if t.Data.Mutated != nil && t.Data.Mutated.ObjectId.String() == p.cfg.XcallStorageID { @@ -119,16 +151,15 @@ func (p *Provider) GenerateMessages(ctx context.Context, messageKey *relayertype }, ) if err != nil { - p.log.Error("failed to query events", zap.Error(err)) return nil, err } blockInfoList, err := p.parseMessagesFromEvents(eventResponse) if err != nil { - p.log.Error("failed to parse messages from events", zap.Error(err)) return nil, err } + var messages []*relayertypes.Message for _, bi := range blockInfoList { messages = append(messages, bi.Messages...) } diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index d5597f65..2c38bad9 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -5,11 +5,13 @@ import ( "fmt" "math/big" "runtime" + "slices" "strings" "sync" "time" wasmTypes "github.com/CosmWasm/wasmd/x/wasm/types" + abci "github.com/cometbft/cometbft/abci/types" coreTypes "github.com/cometbft/cometbft/rpc/core/types" sdkTypes "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/errors" @@ -428,8 +430,9 @@ func (p *Provider) ShouldSendMessage(ctx context.Context, message *relayTypes.Me return true, nil } -func (p *Provider) GenerateMessages(ctx context.Context, messageKey *relayTypes.MessageKeyWithMessageHeight) ([]*relayTypes.Message, error) { - blocks, err := p.fetchBlockMessages(ctx, &types.HeightRange{messageKey.Height, messageKey.Height}) +func (p *Provider) GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*relayTypes.Message, error) { + p.logger.Info("generating message", zap.Uint64("fromHeight", fromHeight), zap.Uint64("toHeight", toHeight)) + blocks, err := p.fetchBlockMessages(ctx, &types.HeightRange{Start: fromHeight, End: toHeight}) if err != nil { return nil, err } @@ -440,6 +443,37 @@ func (p *Provider) GenerateMessages(ctx context.Context, messageKey *relayTypes. return messages, nil } +func (p *Provider) FetchTxMessages(ctx context.Context, txHash string) ([]*relayTypes.Message, error) { + txResult, err := p.client.GetTransactionReceipt(ctx, txHash) + if err != nil { + return nil, err + } + + allowedEvents := []string{ + EventTypeWasmMessage, EventTypeWasmCallMessage, EventTypeWasmRollbackMessage, + } + contractAddresses := []string{ + p.cfg.Contracts[relayTypes.XcallContract], + p.cfg.Contracts[relayTypes.ConnectionContract], + } + + filteredEvents := []abci.Event{} + for _, ev := range txResult.TxResponse.Events { + if !slices.Contains(allowedEvents, ev.Type) { + continue + } + for _, attr := range ev.Attributes { + if attr.Key == EventAttrKeyContractAddress { + if slices.Contains(contractAddresses, attr.Value) { + filteredEvents = append(filteredEvents, ev) + } + } + } + } + + return p.ParseMessageFromEvents(filteredEvents) +} + func (p *Provider) FinalityBlock(ctx context.Context) uint64 { return p.cfg.FinalityBlock } @@ -852,3 +886,24 @@ func (p *Provider) SetLastSavedHeightFunc(f func() uint64) { func (p *Provider) GetLastSavedHeight() uint64 { return p.LastSavedHeightFunc() } + +func (p *Provider) GetLastProcessedBlockHeight(ctx context.Context) (uint64, error) { + return p.GetLastSavedHeight(), nil +} + +func (p *Provider) QueryBlockMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*relayTypes.Message, error) { + heightRange := &types.HeightRange{ + Start: fromHeight, + End: toHeight, + } + blockInfo, err := p.fetchBlockMessages(ctx, heightRange) + if err != nil { + p.logger.Error("failed to fetch block messages", zap.Error(err)) + return nil, err + } + var messages []*relayTypes.Message + for _, block := range blockInfo { + messages = append(messages, block.Messages...) + } + return messages, nil +} diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 17987037..cba04f21 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -16,6 +16,7 @@ type Config interface { GetWallet() string Validate() error Enabled() bool + ContractsAddress() types.ContractConfigMap } type ChainQuery interface { @@ -27,8 +28,8 @@ type ChainProvider interface { ChainQuery NID() string Name() string - Init(context.Context, string, kms.KMS) error Type() string + Init(context.Context, string, kms.KMS) error Config() Config Listener(ctx context.Context, lastProcessedTx types.LastProcessedTx, blockInfo chan *types.BlockInfo) error Route(ctx context.Context, message *types.Message, callback types.TxResponseFunc) error @@ -39,7 +40,8 @@ type ChainProvider interface { SetAdmin(context.Context, string) error FinalityBlock(ctx context.Context) uint64 - GenerateMessages(ctx context.Context, messageKey *types.MessageKeyWithMessageHeight) ([]*types.Message, error) + GenerateMessages(ctx context.Context, fromHeight, toHeight uint64) ([]*types.Message, error) + FetchTxMessages(ctx context.Context, txHash string) ([]*types.Message, error) QueryBalance(ctx context.Context, addr string) (*types.Coin, error) NewKeystore(string) (string, error) @@ -87,3 +89,7 @@ func (pc *CommonConfig) Validate() error { } return nil } + +func (pc *CommonConfig) ContractsAddress() types.ContractConfigMap { + return pc.Contracts +} diff --git a/relayer/relay.go b/relayer/relay.go index 3960e21b..b1314f58 100644 --- a/relayer/relay.go +++ b/relayer/relay.go @@ -12,6 +12,7 @@ import ( ) var ( + Version = "dev" DefaultFlushInterval = 5 * time.Minute listenerChannelBufferSize = 1000 * 5 @@ -274,7 +275,6 @@ func (r *Relayer) processMessages(ctx context.Context) { // & merge message to src cache func (r *Relayer) processBlockInfo(ctx context.Context, src *ChainRuntime, blockInfo *types.BlockInfo) { src.LastBlockHeight = blockInfo.Height - for _, msg := range blockInfo.Messages { msg := types.NewRouteMessage(msg) src.MessageCache.Add(msg) @@ -479,7 +479,7 @@ func (r *Relayer) CheckFinality(ctx context.Context) { } // generateMessage - messages, err := srcChainRuntime.Provider.GenerateMessages(ctx, txObject.MessageKeyWithMessageHeight) + messages, err := srcChainRuntime.Provider.GenerateMessages(ctx, txObject.TxHeight, txObject.TxHeight) if err != nil { r.log.Error("finality processor: generateMessage", zap.Any("message key", txObject.MessageKey), diff --git a/relayer/socket/client.go b/relayer/socket/client.go index ef8ed4fb..68815056 100644 --- a/relayer/socket/client.go +++ b/relayer/socket/client.go @@ -5,22 +5,30 @@ import ( "math/big" "net" + "github.com/icon-project/centralized-relay/relayer/types" jsoniter "github.com/json-iterator/go" - - "github.com/icon-project/centralized-relay/relayer/store" ) const ( - EventGetBlock Event = "GetBlock" - EventGetMessageList Event = "GetMessageList" - EventRelayMessage Event = "RelayMessage" - EventMessageRemove Event = "MessageRemove" - EventPruneDB Event = "PruneDB" - EventRevertMessage Event = "RevertMessage" - EventError Event = "Error" - EventGetFee Event = "GetFee" - EventSetFee Event = "SetFee" - EventClaimFee Event = "ClaimFee" + EventGetBlock Event = "GetBlock" + EventGetMessageList Event = "GetMessageList" + EventRelayMessage Event = "RelayMessage" + EventRelayRangeMessage Event = "RelayRangeMessage" + EventMessageRemove Event = "MessageRemove" + EventPruneDB Event = "PruneDB" + EventRevertMessage Event = "RevertMessage" + EventError Event = "Error" + EventGetFee Event = "GetFee" + EventSetFee Event = "SetFee" + EventClaimFee Event = "ClaimFee" + EventGetLatestHeight Event = "GetLatestHeight" + EventGetBlockRange Event = "GetBlockRange" + EventGetConfig Event = "GetConfig" + EventListChainInfo Event = "ListChainInfo" + EventGetBalance Event = "GetChainBalance" + EventRelayerInfo Event = "RelayerInfo" + EventMessageReceived Event = "MessageReceived" + EventGetBlockEvents Event = "GetBlockEvents" ) var ( @@ -45,102 +53,34 @@ func NewClient() (*Client, error) { } // send sends message to socket -func (c *Client) send(event Event, req interface{}) error { +func (c *Client) send(req interface{}) error { data, err := jsoniter.Marshal(req) if err != nil { return err } - msg := &Message{Event: event, Data: data} - payload, err := jsoniter.Marshal(msg) - if err != nil { - return err - } - if _, err := c.conn.Write(payload); err != nil { + if _, err := c.conn.Write(data); err != nil { return err } return nil } // read and parse message from socket -func (c *Client) read() (interface{}, error) { +func (c *Client) read() (*Response, error) { buf := make([]byte, 1024*100) nr, err := c.conn.Read(buf) if err != nil { return nil, err } - msg := new(Message) - if err := jsoniter.Unmarshal(buf[:nr], msg); err != nil { + res := new(Response) + if err := jsoniter.Unmarshal(buf[:nr], res); err != nil { return nil, err } - return c.parseEvent(msg) -} -// parse event from message -func (c *Client) parseEvent(msg *Message) (interface{}, error) { - switch msg.Event { - case EventGetBlock: - var res []*ResGetBlock - if err := jsoniter.Unmarshal(msg.Data, &res); err != nil { - return nil, err - } - return res, nil - case EventGetMessageList: - res := new(ResMessageList) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - case EventRelayMessage: - res := new(ResRelayMessage) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - case EventMessageRemove: - res := new(ResMessageRemove) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - case EventPruneDB: - res := new(ResPruneDB) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - case EventError: - res := new(ChainProviderError) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, fmt.Errorf(res.Message) - case EventRevertMessage: - res := new(ResRevertMessage) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - case EventGetFee: - res := new(ResGetFee) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - case EventSetFee: - res := new(ResSetFee) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - case EventClaimFee: - res := new(ResClaimFee) - if err := jsoniter.Unmarshal(msg.Data, res); err != nil { - return nil, err - } - return res, nil - default: - return nil, ErrUnknownEvent + if !res.Success { + return nil, fmt.Errorf(res.Message) } + + return res, nil } func (c *Client) Close() error { @@ -149,153 +89,224 @@ func (c *Client) Close() error { // GetBlock sends GetBlock event to socket func (c *Client) GetBlock(chain string) ([]*ResGetBlock, error) { - req := &ReqGetBlock{Chain: chain, All: chain == ""} - if err := c.send(EventGetBlock, req); err != nil { + req := &ReqGetBlock{Chain: chain} + if err := c.send(&Request{Event: EventGetBlock, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.([]*ResGetBlock) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := []*ResGetBlock{} + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // GetMessageList sends GetMessageList event to socket -func (c *Client) GetMessageList(chain string, pagination *store.Pagination) (*ResMessageList, error) { - req := &ReqMessageList{Chain: chain, Pagination: pagination} - if err := c.send(EventGetMessageList, req); err != nil { +func (c *Client) GetMessageList(chain string, limit uint) (*ResMessageList, error) { + req := &ReqMessageList{Chain: chain, Limit: limit} + if err := c.send(&Request{Event: EventGetMessageList, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResMessageList) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := new(ResMessageList) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // RelayMessage sends RelayMessage event to socket -func (c *Client) RelayMessage(chain string, height uint64, sn *big.Int) (*ResRelayMessage, error) { - req := &ReqRelayMessage{Chain: chain, Sn: sn, Height: height} - if err := c.send(EventRelayMessage, req); err != nil { +func (c *Client) RelayMessage(chain string, height uint64, txHash string) ([]*types.Message, error) { + req := &ReqRelayMessage{ + Chain: chain, + Height: height, + TxHash: txHash, + } + if err := c.send(&Request{Event: EventRelayMessage, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResRelayMessage) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := []*types.Message{} + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // MessageRemove sends MessageRemove event to socket func (c *Client) MessageRemove(chain string, sn *big.Int) (*ResMessageRemove, error) { req := &ReqMessageRemove{Chain: chain, Sn: sn} - if err := c.send(EventMessageRemove, req); err != nil { + if err := c.send(&Request{Event: EventMessageRemove, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResMessageRemove) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := new(ResMessageRemove) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // PruneDB sends PruneDB event to socket func (c *Client) PruneDB() (*ResPruneDB, error) { req := &ReqPruneDB{} - if err := c.send(EventPruneDB, req); err != nil { + if err := c.send(&Request{Event: EventPruneDB, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResPruneDB) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := new(ResPruneDB) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // RevertMessage sends RevertMessage event to socket func (c *Client) RevertMessage(chain string, sn uint64) (*ResRevertMessage, error) { req := &ReqRevertMessage{Chain: chain, Sn: sn} - if err := c.send(EventRevertMessage, req); err != nil { + if err := c.send(&Request{Event: EventRevertMessage, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResRevertMessage) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := new(ResRevertMessage) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // GetFee sends GetFee event to socket func (c *Client) GetFee(chain string, network string, isReponse bool) (*ResGetFee, error) { req := &ReqGetFee{Chain: chain, Network: network, Response: isReponse} - if err := c.send(EventGetFee, req); err != nil { + if err := c.send(&Request{Event: EventGetFee, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResGetFee) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := new(ResGetFee) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // SetFee sends SetFee event to socket func (c *Client) SetFee(chain, network string, msgFee, resFee *big.Int) (*ResSetFee, error) { req := &ReqSetFee{Chain: chain, Network: network, MsgFee: msgFee, ResFee: resFee} - if err := c.send(EventSetFee, req); err != nil { + if err := c.send(&Request{Event: EventSetFee, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResSetFee) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := new(ResSetFee) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil } // ClaimFee sends ClaimFee event to socket func (c *Client) ClaimFee(chain string) (*ResClaimFee, error) { req := &ReqClaimFee{Chain: chain} - if err := c.send(EventClaimFee, req); err != nil { + if err := c.send(&Request{Event: EventClaimFee, Data: req}); err != nil { return nil, err } - data, err := c.read() + res, err := c.read() if err != nil { return nil, err } - res, ok := data.(*ResClaimFee) - if !ok { - return nil, ErrInvalidResponse(err) + + resData := new(ResClaimFee) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err } - return res, nil + + return resData, nil +} + +func (c *Client) GetLatestHeight(chain string) (*ResChainHeight, error) { + req := &ReqChainHeight{Chain: chain} + if err := c.send(&Request{Event: EventGetLatestHeight, Data: req}); err != nil { + return nil, err + } + res, err := c.read() + if err != nil { + return nil, err + } + + resData := new(ResChainHeight) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err + } + + return resData, nil +} + +func (c *Client) QueryBlockRange(chain string, fromHeight, toHeight uint64) (*ResRangeBlockQuery, error) { + req := &ReqRangeBlockQuery{Chain: chain, FromHeight: fromHeight, ToHeight: toHeight} + if err := c.send(&Request{Event: EventGetBlockRange, Data: req}); err != nil { + return nil, err + } + res, err := c.read() + if err != nil { + return nil, err + } + + resData := new(ResRangeBlockQuery) + if err := parseResData(res.Data, &resData); err != nil { + return nil, err + } + + return resData, nil +} + +func parseResData(data any, dest interface{}) error { + jsonData, err := jsoniter.Marshal(data) + if err != nil { + return err + } + + if err := jsoniter.Unmarshal(jsonData, dest); err != nil { + return err + } + + return nil } diff --git a/relayer/socket/server.go b/relayer/socket/server.go index 8cac2dcf..25d35dd7 100644 --- a/relayer/socket/server.go +++ b/relayer/socket/server.go @@ -7,15 +7,17 @@ import ( "net" "os" "path" + "time" jsoniter "github.com/json-iterator/go" "github.com/icon-project/centralized-relay/relayer" + "github.com/icon-project/centralized-relay/relayer/store" "github.com/icon-project/centralized-relay/relayer/types" ) var ( - SocketPath = path.Join(os.TempDir(), "relayer.sock") + SocketPath = getEnvOrFallback("SOCKET_PATH", path.Join(os.TempDir(), "relayer.sock")) network = "unix" ) @@ -24,7 +26,7 @@ func NewSocket(rly *relayer.Relayer) (*Server, error) { if err != nil { return nil, err } - return &Server{listener: l, rly: rly}, nil + return &Server{listener: l, startedAt: time.Now().Unix(), rly: rly}, nil } // Listen to socket @@ -58,23 +60,20 @@ func (s *Server) server(c net.Conn) { // Parse message from socket func (s *Server) parse(data []byte) ([]byte, error) { - msg := new(Message) + msg := new(Request) if err := jsoniter.Unmarshal(data, msg); err != nil { - return nil, err - } - payload, err := s.parseEvent(msg) - if err != nil { - return nil, err + return makeError(err), nil } + payload := s.parseEvent(msg) return jsoniter.Marshal(payload) } // makeError for the client to write to socket func makeError(err error) []byte { - message := &Message{EventError, []byte(fmt.Sprintf(`{"message": "%s"}`, err.Error()))} + message := &Response{Event: EventError, Message: err.Error()} data, err := jsoniter.Marshal(message) if err != nil { - return []byte(fmt.Sprintf(`{"error": "%s"}`, err.Error())) + return []byte(fmt.Sprintf(`{"event":"%s","message":"%s","success":false}`, EventError, err.Error())) } return data } @@ -89,197 +88,289 @@ func (s *Server) send(conn net.Conn, data []byte) error { } // parseEvent for the client to write to socket -func (s *Server) parseEvent(msg *Message) (*Message, error) { +func (s *Server) parseEvent(msg *Request) *Response { + data, err := jsoniter.Marshal(msg.Data) + if err != nil { + return &Response{ID: msg.ID, Event: EventError, Message: err.Error()} + } + response := &Response{ID: msg.ID, Event: msg.Event} + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() switch msg.Event { case EventGetBlock: req := new(ReqGetBlock) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } + store := s.rly.GetBlockStore() var blocks []*ResGetBlock - - if req.All { + if req.Chain == "" { for _, chain := range s.rly.GetAllChainsRuntime() { - blocks = append(blocks, &ResGetBlock{chain.Provider.NID(), chain.LastSavedHeight}) - } - data, err := jsoniter.Marshal(blocks) - if err != nil { - return nil, err + latestHeight, err := chain.Provider.QueryLatestHeight(ctx) + if err != nil { + return response.SetError(err) + } + blocks = append(blocks, &ResGetBlock{ + Chain: chain.Provider.NID(), + CheckPointHeight: chain.LastSavedHeight, + LatestHeight: latestHeight, + }) } - return &Message{EventGetBlock, data}, nil + return response.SetData(blocks) } - - store := s.rly.GetBlockStore() - height, err := store.GetLastStoredBlock(req.Chain) + checkPointHeight, err := store.GetLastStoredBlock(req.Chain) if err != nil { - return nil, err + return response.SetError(err) } - - blocks = append(blocks, &ResGetBlock{req.Chain, height}) - data, err := jsoniter.Marshal(blocks) + chain, err := s.rly.FindChainRuntime(req.Chain) + if err != nil { + return response.SetError(err) + } + latestHeight, err := chain.Provider.QueryLatestHeight(ctx) if err != nil { - return nil, err + return response.SetError(err) } - return &Message{EventGetBlock, data}, nil + blocks = append(blocks, &ResGetBlock{Chain: req.Chain, CheckPointHeight: checkPointHeight, LatestHeight: latestHeight}) + return response.SetData(blocks) case EventGetMessageList: req := new(ReqMessageList) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } - store := s.rly.GetMessageStore() - messages, err := store.GetMessages(req.Chain, req.Pagination) + msgStore := s.rly.GetMessageStore() + messages, err := msgStore.GetMessages(req.Chain, &store.Pagination{Limit: req.Limit}) if err != nil { - return nil, err + return response.SetError(err) } - total, err := store.TotalCountByChain(req.Chain) + total, err := msgStore.TotalCountByChain(req.Chain) if err != nil { - return nil, err + return response.SetError(err) } - data, err := jsoniter.Marshal(&ResMessageList{messages, int(total)}) - if err != nil { - return nil, err - } - return &Message{EventGetMessageList, data}, nil + return response.SetData(&ResMessageList{messages, int(total)}) case EventMessageRemove: req := new(ReqMessageRemove) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } store := s.rly.GetMessageStore() key := &types.MessageKey{Src: req.Chain, Sn: req.Sn} message, err := store.GetMessage(key) if err != nil { - return nil, err + return response.SetError(err) } if err := store.DeleteMessage(key); err != nil { - return nil, err + return response.SetError(err) } - data, err := jsoniter.Marshal(&ResMessageRemove{req.Sn, req.Chain, message.Dst, message.MessageHeight, message.EventType}) - if err != nil { - return nil, err - } - return &Message{EventMessageRemove, data}, nil + return response.SetData(&ResMessageRemove{req.Sn, req.Chain, message.Dst, message.MessageHeight, message.EventType}) case EventRelayMessage: req := new(ReqRelayMessage) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } - src, err := s.rly.FindChainRuntime(req.Chain) if err != nil { - return nil, err + return response.SetError(err) } - if req.Height != 0 { - msgs, err := src.Provider.GenerateMessages(context.Background(), types.NewMessagekeyWithMessageHeight(&types.MessageKey{Src: req.Chain, Sn: req.Sn}, req.Height)) + messages := []*types.Message{} + if req.TxHash != "" { + msgs, err := src.Provider.FetchTxMessages(ctx, req.TxHash) if err != nil { - return nil, err - } - for _, msg := range msgs { - src.MessageCache.Add(types.NewRouteMessage(msg)) + return response.SetError(err) } - if len(msgs) > 0 { - data, err := jsoniter.Marshal(&ResRelayMessage{types.NewRouteMessage(msgs[0])}) - if err != nil { - return nil, err - } - return &Message{EventRelayMessage, data}, nil + messages = append(messages, msgs...) + } else if req.Height != 0 { + msgs, err := src.Provider.GenerateMessages(ctx, req.Height, req.Height) + if err != nil { + return response.SetError(err) } - return &Message{EventRelayMessage, []byte{}}, nil - } - - store := s.rly.GetMessageStore() - key := &types.MessageKey{Src: req.Chain, Sn: req.Sn} - message, err := store.GetMessage(key) - if err != nil { - return nil, err + messages = append(messages, msgs...) } - src.MessageCache.Add(message) - data, err := jsoniter.Marshal(&ResRelayMessage{message}) - if err != nil { - return nil, err + for _, msg := range messages { + src.MessageCache.Add(types.NewRouteMessage(msg)) } - return &Message{EventRelayMessage, data}, nil + return response.SetData(messages) case EventPruneDB: - if err := s.rly.PruneDB(); err != nil { - return nil, err + req := new(ReqPruneDB) + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } - data, err := jsoniter.Marshal(&ResPruneDB{"Success"}) - if err != nil { - return nil, err + if err := s.rly.PruneDB(); err != nil { + return response.SetError(err) } - return &Message{EventPruneDB, data}, nil + return response.SetData(&ResPruneDB{"Success"}) case EventRevertMessage: req := new(ReqRevertMessage) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } chain, err := s.rly.FindChainRuntime(req.Chain) if err != nil { - return nil, err + return response.SetError(err) } - if err := chain.Provider.RevertMessage(context.Background(), big.NewInt(0).SetUint64(req.Sn)); err != nil { - return nil, err + if err := chain.Provider.RevertMessage(ctx, new(big.Int).SetUint64(req.Sn)); err != nil { + return response.SetError(err) } - data, err := jsoniter.Marshal(&ResRevertMessage{req.Sn}) - if err != nil { - return nil, err - } - return &Message{EventRevertMessage, data}, nil + return response.SetData(&ResRevertMessage{req.Sn}) case EventGetFee: req := new(ReqGetFee) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } chain, err := s.rly.FindChainRuntime(req.Chain) if err != nil { - return nil, err - } - fee, err := chain.Provider.GetFee(context.Background(), req.Network, req.Response) - if err != nil { - return nil, err + return response.SetError(err) } - data, err := jsoniter.Marshal(&ResGetFee{Chain: req.Chain, Fee: fee}) + fee, err := chain.Provider.GetFee(ctx, req.Network, req.Response) if err != nil { - return nil, err + return response.SetError(err) } - return &Message{EventGetFee, data}, nil + return response.SetData(&ResGetFee{Chain: req.Chain, Fee: fee}) case EventSetFee: req := new(ReqSetFee) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } chain, err := s.rly.FindChainRuntime(req.Chain) if err != nil { - return nil, err + return response.SetError(err) } - if err := chain.Provider.SetFee(context.Background(), req.Network, req.MsgFee, req.ResFee); err != nil { - return nil, err + if err := chain.Provider.SetFee(ctx, req.Network, req.MsgFee, req.ResFee); err != nil { + return response.SetError(err) } - data, err := jsoniter.Marshal(&ResSetFee{"Success"}) - if err != nil { - return nil, err - } - return &Message{EventSetFee, data}, nil + return response.SetData(&ResSetFee{"Success"}) case EventClaimFee: req := new(ReqClaimFee) - if err := jsoniter.Unmarshal(msg.Data, req); err != nil { - return nil, err + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) + } + chain, err := s.rly.FindChainRuntime(req.Chain) + if err != nil { + return response.SetError(err) + } + if err := chain.Provider.ClaimFee(ctx); err != nil { + return response.SetError(err) + } + return response.SetData(&ResClaimFee{"Success"}) + case EventGetConfig: + req := new(ReqChainHeight) + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } chain, err := s.rly.FindChainRuntime(req.Chain) if err != nil { - return nil, err + return response.SetError(err) + } + return response.SetData(chain.Provider.Config()) + case EventListChainInfo: + req := new(ReqListChain) + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) + } + var ( + chainNames []*ResChainInfo + chains []*relayer.ChainRuntime + ) + if len(req.Chains) > 0 { + for _, chainName := range req.Chains { + chain, err := s.rly.FindChainRuntime(chainName) + if err != nil { + return response.SetError(err) + } + chains = append(chains, chain) + } + } else { + chains = s.rly.GetAllChainsRuntime() + } + for _, chain := range chains { + latestHeight, _ := chain.Provider.QueryLatestHeight(ctx) + chainNames = append(chainNames, &ResChainInfo{ + Name: chain.Provider.Name(), + NID: chain.Provider.NID(), + Address: chain.Provider.Config().GetWallet(), + Type: chain.Provider.Type(), + LatestHeight: latestHeight, + LastCheckPoint: chain.LastSavedHeight, + Contracts: chain.Provider.Config().ContractsAddress(), + }) + } + return response.SetData(chainNames) + case EventGetBalance: + var reqs []ReqGetBalance + if err := jsoniter.Unmarshal(data, &reqs); err != nil { + return response.SetError(err) + } + res := make([]*ResGetBalance, 0, len(reqs)) + for _, req := range reqs { + chain, err := s.rly.FindChainRuntime(req.Chain) + if err != nil { + return &Response{ID: msg.ID, Event: EventGetBalance, Data: res, Message: err.Error()} + } + balance, err := chain.Provider.QueryBalance(ctx, req.Address) + if err != nil { + return response.SetError(err) + } + res = append(res, &ResGetBalance{Chain: req.Chain, Address: req.Address, Balance: balance}) } - if err := chain.Provider.ClaimFee(context.Background()); err != nil { - return nil, err + return response.SetData(res) + case EventMessageReceived: + req := new(ReqMessageReceived) + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) } - data, err := jsoniter.Marshal(&ResClaimFee{"Success"}) + chain, err := s.rly.FindChainRuntime(req.Chain) if err != nil { - return nil, err + return response.SetError(err) } - return &Message{EventClaimFee, data}, nil + key := &types.MessageKey{Src: req.Chain, Sn: new(big.Int).SetUint64(req.Sn)} + received, err := chain.Provider.MessageReceived(context.Background(), key) + if err != nil { + return response.SetError(err) + } + return response.SetData(&ResMessageReceived{Chain: req.Chain, Sn: req.Sn, Received: received}) + case EventRelayerInfo: + req := new(ReqRelayInfo) + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) + } + return response.SetData(&ResRelayInfo{Version: relayer.Version, Uptime: s.startedAt}) + case EventGetBlockEvents: + req := new(ReqGetBlockEvents) + if err := jsoniter.Unmarshal(data, req); err != nil { + return response.SetError(err) + } + var events []*ResGetBlockEvents + for _, chain := range s.rly.GetAllChainsRuntime() { + msgs, _ := chain.Provider.FetchTxMessages(ctx, req.TxHash) + for _, msg := range msgs { + msgKey := types.NewMessageKey(msg.Sn, msg.Src, msg.Dst, msg.EventType) + received, err := chain.Provider.MessageReceived(ctx, msgKey) + if err != nil { + return response.SetError(err) + } + data := &ResGetBlockEvents{ + Event: msg.EventType, + Height: msg.MessageHeight, + Executed: received, + TxHash: req.TxHash, + ChainInfo: struct { + NID string `json:"nid"` + Name string `json:"name"` + Type string `json:"type"` + Contracts types.ContractConfigMap `json:"contracts"` + }{ + NID: chain.Provider.NID(), + Name: chain.Provider.Name(), + Type: chain.Provider.Type(), + Contracts: chain.Provider.Config().ContractsAddress(), + }, + } + events = append(events, data) + } + } + return response.SetData(events) default: - return nil, fmt.Errorf("invalid request") + return response.SetError(fmt.Errorf("unknown event %s", msg.Event)) } } @@ -290,3 +381,10 @@ func (s *Server) Close() error { func (s *Server) IsClosed() bool { return s.listener == nil } + +func getEnvOrFallback(key string, fallback string) string { + if val, exists := os.LookupEnv(key); exists { + return val + } + return fallback +} diff --git a/relayer/socket/types.go b/relayer/socket/types.go index f08fcf11..308b674f 100644 --- a/relayer/socket/types.go +++ b/relayer/socket/types.go @@ -5,119 +5,226 @@ import ( "net" "github.com/icon-project/centralized-relay/relayer" - "github.com/icon-project/centralized-relay/relayer/store" "github.com/icon-project/centralized-relay/relayer/types" ) type Event string -type Message struct { - Event Event - Data []byte +type Request struct { + ID string `json:"id"` + Event Event `json:"event"` + Data any `json:"data"` +} + +type Response struct { + ID string `json:"id"` + Event Event `json:"event"` + Success bool `json:"success"` + Data any `json:"data,omitempty"` + Message string `json:"message,omitempty"` +} + +func (r *Response) SetError(err error) *Response { + r.Message = err.Error() + return r +} + +func (r *Response) SetData(data any) *Response { + r.Success = true + r.Data = data + return r } type Server struct { - listener net.Listener - rly *relayer.Relayer + listener net.Listener + startedAt int64 + rly *relayer.Relayer +} + +type Pagination struct { + Page uint + Limit uint } type ReqMessageList struct { - Chain string - Pagination *store.Pagination + Chain string `json:"chain"` + Limit uint `json:"pagination"` } type ReqGetBlock struct { - Chain string - All bool + Chain string `json:"chain"` } type ReqRelayMessage struct { - Chain string - Sn *big.Int - Height uint64 + Chain string `json:"chain"` + Height uint64 `json:"height"` + TxHash string `json:"txHash"` } type ReqMessageRemove struct { - Chain string - Sn *big.Int + Chain string `json:"chain"` + Sn *big.Int `json:"sn"` } type ResMessageRemove struct { - Sn *big.Int - Chain string - Dst string - Height uint64 - Event string + Sn *big.Int `json:"sn"` + Chain string `json:"chain"` + Dst string `json:"dst"` + Height uint64 `json:"height"` + Event string `json:"event"` } type ResMessageList struct { - Messages []*types.RouteMessage - Total int + Message []*types.RouteMessage `json:"message"` + Total int `json:"total"` } type ResGetBlock struct { - Chain string - Height uint64 -} - -type ResRelayMessage struct { - *types.RouteMessage + Chain string `json:"chain"` + CheckPointHeight uint64 `json:"checkPointHeight"` + LatestHeight uint64 `json:"latestHeight"` } type ReqPruneDB struct { - Chain string + ID string `json:"id"` + Chain string `json:"chain"` } type ResPruneDB struct { - Status string + Status string `json:"status"` } type ErrResponse struct { - Error string + Error string `json:"error"` } type ReqRevertMessage struct { - Chain string - Sn uint64 + Chain string `json:"chain"` + Sn uint64 `json:"sn"` } type ResRevertMessage struct { - Sn uint64 + Sn uint64 `json:"sn"` } type ReqGetFee struct { - Chain string - Network string - Response bool + Chain string `json:"chain"` + Network string `json:"network"` + Response bool `json:"response"` } type ResGetFee struct { - Chain string - Fee uint64 - Response bool + Chain string `json:"chain"` + Fee uint64 `json:"fee"` + Response bool `json:"response"` } // ReqSetFee sends SetFee event to socket type ReqSetFee struct { - Chain string - Network string - MsgFee *big.Int - ResFee *big.Int + Chain string `json:"chain"` + Network string `json:"network"` + MsgFee *big.Int `json:"msg_fee"` + ResFee *big.Int `json:"res_fee"` } // ResSetFee sends SetFee event to socket type ResSetFee struct { - Status string + Status string `json:"status"` } // ReqClaimFee sends ClaimFee event to socket type ReqClaimFee struct { - Chain string + Chain string `json:"chain"` } // ResClaimFee sends ClaimFee event to socket type ResClaimFee struct { - Status string + Status string `json:"status"` +} + +type ReqChainHeight struct { + Chain string `json:"chain"` +} + +type ResChainHeight struct { + Chain string `json:"chain"` + Height uint64 `json:"height"` +} + +type ReqProcessedBlock struct { + Chain string `json:"chain"` +} + +type ReqRangeBlockQuery struct { + Chain string `json:"chain"` + FromHeight uint64 `json:"from_height"` + ToHeight uint64 `json:"to_height"` +} + +type ResRangeBlockQuery struct { + Chain string `json:"chain"` + Msgs []*types.Message `json:"messages"` +} + +type ReqListChain struct { + Chains []string `json:"chains,omitempty"` +} + +type ResChainInfo struct { + Name string `json:"name"` + NID string `json:"nid"` + Address string `json:"address"` + Type string `json:"type"` + Contracts map[string]string `json:"contracts"` + LatestHeight uint64 `json:"latestHeight"` + LastCheckPoint uint64 `json:"lastCheckPoint"` +} + +type ReqGetBalance struct { + Chain string `json:"chain"` + Address string `json:"address"` +} + +type ResGetBalance struct { + Chain string `json:"chain"` + Address string `json:"address"` + Balance *types.Coin `json:"balance"` +} + +type ReqRelayInfo struct{} + +type ResRelayInfo struct { + Version string `json:"version"` + Uptime int64 `json:"uptime"` +} + +type ReqMessageReceived struct { + Chain string `json:"chain"` + Sn uint64 `json:"sn"` +} + +type ResMessageReceived struct { + Chain string `json:"chain"` + Sn uint64 `json:"sn"` + Received bool `json:"received"` +} + +type ReqGetBlockEvents struct { + Height uint64 `json:"height,omitempty"` + TxHash string `json:"txHash,omitempty"` +} + +type ResGetBlockEvents struct { + Event string `json:"event"` + Height uint64 `json:"height"` + Executed bool `json:"executed"` + TxHash string `json:"txHash"` + ChainInfo struct { + NID string `json:"nid"` + Name string `json:"name"` + Type string `json:"type"` + Contracts types.ContractConfigMap `json:"contracts"` + } `json:"chainInfo"` } type ChainProviderError struct { diff --git a/relayer/types/types.go b/relayer/types/types.go index 9eafb9ed..25ae0925 100644 --- a/relayer/types/types.go +++ b/relayer/types/types.go @@ -72,9 +72,9 @@ func (m *Message) MessageKey() *MessageKey { type RouteMessage struct { *Message - Retry uint8 - Processing bool - LastTry time.Time + Retry uint8 `json:"retry"` + Processing bool `json:"processing"` + LastTry time.Time `json:"lastTry"` } func NewRouteMessage(m *Message) *RouteMessage { @@ -208,8 +208,8 @@ func (m *MessageCache) HasCacheKey(cacheKey string) bool { } type Coin struct { - Denom string - Amount uint64 + Denom string `json:"denom"` + Amount uint64 `json:"amount"` } func NewCoin(denom string, amount uint64) *Coin { @@ -243,6 +243,10 @@ type Receipt struct { Status bool } +type EventLog struct { + Height uint64 + Events []string +} type LastProcessedTx struct { Height uint64 Info []byte