Skip to content

Commit

Permalink
add tx sender
Browse files Browse the repository at this point in the history
  • Loading branch information
CoderZhi committed Dec 4, 2023
1 parent 0a1b4a4 commit 317ff40
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 25 deletions.
10 changes: 10 additions & 0 deletions scripts/alter_transfer_table.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.bsc_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.ethereum_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.heco_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.matic_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.polis_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.iotex_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.iotex_to_heco_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.iotex_to_polis_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.iotex_to_bsc_transfers ADD txSender varchar(42);"
docker exec witness-db mysql -uroot -pkdfjjrU64fjK58H -e "ALTER TABLE witness.iotex_to_matic_transfers ADD txSender varchar(42);"
36 changes: 23 additions & 13 deletions witness-service/grpc/types/witness.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions witness-service/proto/types/witness.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ message Transfer {
uint64 gas = 8;
string gasPrice = 9;
string fee = 10;
bytes txSender = 11;
}

message Witness {
Expand Down
24 changes: 19 additions & 5 deletions witness-service/relayer/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (recorder *Recorder) initStore(
"`token` varchar(42) NOT NULL,"+
"`tidx` bigint(20) NOT NULL,"+
"`sender` varchar(42) NOT NULL,"+
"`txSender` varchar(42),"+
"`recipient` varchar(42) NOT NULL,"+
"`amount` varchar(78) NOT NULL,"+
"`fee` varchar(78),"+
Expand Down Expand Up @@ -234,18 +235,28 @@ func (recorder *Recorder) addWitness(
transferTableName, witnessTableName string,
) error {
if _, err := tx.Exec(
fmt.Sprintf("INSERT IGNORE INTO %s (cashier, token, tidx, sender, recipient, amount, fee, id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", transferTableName),
fmt.Sprintf("INSERT IGNORE INTO %s (cashier, token, tidx, sender, txSender, recipient, amount, fee, id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", transferTableName),
transfer.cashier.Hex(),
transfer.token.Hex(),
transfer.index,
transfer.sender.Hex(),
transfer.txSender.Hex(),
transfer.recipient.Hex(),
transfer.amount.String(),
transfer.fee.String(),
transfer.id.Hex(),
); err != nil {
return errors.Wrap(err, "failed to insert into transfer table")
}
if transfer.txSender != zeroAddress {
if _, err := tx.Exec(
fmt.Sprintf("UPDATE `%s` SET `txSender`=? WHERE `id`=?", transferTableName),
transfer.txSender.Hex(),
transfer.id.Hex(),
); err != nil {
return errors.Wrap(err, "failed to update tx sender")
}
}
if witnessTableName != "" && len(witness.signature) != 0 {
if _, err := tx.Exec(
fmt.Sprintf("INSERT IGNORE INTO %s (`transferId`, `witness`, `signature`) VALUES (?, ?, ?)", witnessTableName),
Expand Down Expand Up @@ -307,15 +318,18 @@ func (recorder *Recorder) assembleTransfer(scan func(dest ...interface{}) error)
tx := &Transfer{}
var rawAmount string
var cashier, token, sender, recipient, id string
var relayer, hash, gasPrice, fee sql.NullString
var relayer, hash, gasPrice, fee, txSender sql.NullString
var gas, nonce sql.NullInt64
var timestamp sql.NullTime
if err := scan(&cashier, &token, &tx.index, &sender, &recipient, &rawAmount, &fee, &id, &hash, &timestamp, &nonce, &gas, &gasPrice, &tx.status, &tx.updateTime, &relayer); err != nil {
if err := scan(&cashier, &token, &tx.index, &sender, &txSender, &recipient, &rawAmount, &fee, &id, &hash, &timestamp, &nonce, &gas, &gasPrice, &tx.status, &tx.updateTime, &relayer); err != nil {
return nil, errors.Wrap(err, "failed to scan transfer")
}
tx.cashier = common.HexToAddress(cashier)
tx.token = common.HexToAddress(token)
tx.sender = common.HexToAddress(sender)
if txSender.Valid {
tx.txSender = common.HexToAddress(txSender.String)
}
tx.recipient = common.HexToAddress(recipient)
tx.id = common.HexToHash(id)
if relayer.Valid {
Expand Down Expand Up @@ -360,7 +374,7 @@ func (recorder *Recorder) Transfer(id common.Hash) (*Transfer, error) {
recorder.mutex.RLock()
defer recorder.mutex.RUnlock()
row := recorder.store.DB().QueryRow(
fmt.Sprintf("SELECT `cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `fee`, `id`, `txHash`, `txTimestamp`, `nonce`, `gas`, `gasPrice`, `status`, `updateTime`, `relayer` FROM %s WHERE `id`=?", recorder.transferTableName),
fmt.Sprintf("SELECT `cashier`, `token`, `tidx`, `sender`, `txSender`, `recipient`, `amount`, `fee`, `id`, `txHash`, `txTimestamp`, `nonce`, `gas`, `gasPrice`, `status`, `updateTime`, `relayer` FROM %s WHERE `id`=?", recorder.transferTableName),
id.Hex(),
)
return recorder.assembleTransfer(row.Scan)
Expand Down Expand Up @@ -463,7 +477,7 @@ func (recorder *Recorder) Transfers(
if byUpdateTime {
orderBy = "updateTime"
}
query = fmt.Sprintf("SELECT `cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `fee`, `id`, `txHash`, `txTimestamp`, `nonce`, `gas`, `gasPrice`, `status`, `updateTime`, `relayer` FROM %s", recorder.transferTableName)
query = fmt.Sprintf("SELECT `cashier`, `token`, `tidx`, `sender`, `txSender`, `recipient`, `amount`, `fee`, `id`, `txHash`, `txTimestamp`, `nonce`, `gas`, `gasPrice`, `status`, `updateTime`, `relayer` FROM %s", recorder.transferTableName)
params := []interface{}{}
if len(queryOpts) > 0 {
conditions := []string{}
Expand Down
1 change: 1 addition & 0 deletions witness-service/relayer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (s *Service) List(ctx context.Context, request *services.ListRequest) (*ser
Gas: transfer.gas,
GasPrice: gasPrice,
Timestamp: timestamppb.New(transfer.timestamp),
TxSender: transfer.txSender.Bytes(),
}
response.Statuses[i] = s.assembleCheckResponse(transfer, witnesses)
if len(witnesses) == 0 && transfer.status == WaitingForWitnesses {
Expand Down
1 change: 1 addition & 0 deletions witness-service/relayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type (
token common.Address
index uint64
sender common.Address
txSender common.Address
recipient common.Address
amount *big.Int
fee *big.Int
Expand Down
23 changes: 17 additions & 6 deletions witness-service/witness/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (recorder *Recorder) Start(ctx context.Context) error {
"`id` varchar(132),"+
"`blockHeight` bigint(20) NOT NULL,"+
"`txHash` varchar(66) NOT NULL,"+
"`txSender` varchar(42),"+
"PRIMARY KEY (`cashier`,`token`,`tidx`),"+
"KEY `id_index` (`id`),"+
"KEY `cashier_index` (`cashier`),"+
Expand Down Expand Up @@ -110,7 +111,7 @@ func (recorder *Recorder) AddTransfer(tx *Transfer, status TransferStatus) error
if tx.amount.Sign() != 1 {
return errors.New("amount should be larger than 0")
}
query := fmt.Sprintf("INSERT IGNORE INTO %s (`cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `fee`, `blockHeight`, `txHash`, `status`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", recorder.transferTableName)
query := fmt.Sprintf("INSERT IGNORE INTO %s (`cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `fee`, `blockHeight`, `txHash`, `txSender`, `status`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", recorder.transferTableName)
result, err := recorder.store.DB().Exec(
query,
tx.cashier.Hex(),
Expand All @@ -122,6 +123,7 @@ func (recorder *Recorder) AddTransfer(tx *Transfer, status TransferStatus) error
tx.fee.String(),
tx.blockHeight,
tx.txHash.Hex(),
tx.txSender.Hex(),
status,
)
if err != nil {
Expand All @@ -144,7 +146,7 @@ func (recorder *Recorder) UpsertTransfer(tx *Transfer) error {
if tx.amount.Sign() != 1 {
return errors.New("amount should be larger than 0")
}
query := fmt.Sprintf("INSERT INTO %s (`cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `fee`, `blockHeight`, `txHash`, `status`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `status` = IF(status = ?, ?, status)", recorder.transferTableName)
query := fmt.Sprintf("INSERT INTO %s (`cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `fee`, `blockHeight`, `txHash`, `txSender`, `status`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `status` = IF(status = ?, ?, status)", recorder.transferTableName)
result, err := recorder.store.DB().Exec(
query,
tx.cashier.Hex(),
Expand All @@ -156,6 +158,7 @@ func (recorder *Recorder) UpsertTransfer(tx *Transfer) error {
tx.fee.String(),
tx.blockHeight,
tx.txHash.Hex(),
tx.txSender.Hex(),
TransferReady,
TransferNew,
TransferReady,
Expand Down Expand Up @@ -270,7 +273,7 @@ func (recorder *Recorder) TransfersToSubmit() ([]*Transfer, error) {
func (recorder *Recorder) transfers(status TransferStatus) ([]*Transfer, error) {
rows, err := recorder.store.DB().Query(
fmt.Sprintf(
"SELECT cashier, token, tidx, sender, recipient, amount, fee, status, id "+
"SELECT cashier, token, tidx, sender, recipient, amount, fee, status, id, txSender "+
"FROM %s "+
"WHERE status=? "+
"ORDER BY creationTime",
Expand All @@ -293,7 +296,8 @@ func (recorder *Recorder) transfers(status TransferStatus) ([]*Transfer, error)
var rawAmount string
var fee sql.NullString
var id sql.NullString
if err := rows.Scan(&cashier, &token, &tx.index, &sender, &recipient, &rawAmount, &fee, &tx.status, &id); err != nil {
var txSender sql.NullString
if err := rows.Scan(&cashier, &token, &tx.index, &sender, &recipient, &rawAmount, &fee, &tx.status, &id, &txSender); err != nil {
return nil, err
}
tx.cashier = common.HexToAddress(cashier)
Expand All @@ -303,6 +307,9 @@ func (recorder *Recorder) transfers(status TransferStatus) ([]*Transfer, error)
if id.Valid {
tx.id = common.HexToHash(id.String)
}
if txSender.Valid {
tx.txSender = common.HexToAddress(txSender.String)
}
tx.fee = big.NewInt(0)
var ok bool
if fee.Valid {
Expand All @@ -328,7 +335,7 @@ func (recorder *Recorder) transfers(status TransferStatus) ([]*Transfer, error)

func (recorder *Recorder) Transfer(_id common.Hash) (*Transfer, error) {
row := recorder.store.DB().QueryRow(
fmt.Sprintf("SELECT `cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `status`, `id` FROM %s WHERE `id`=?", recorder.transferTableName),
fmt.Sprintf("SELECT `cashier`, `token`, `tidx`, `sender`, `recipient`, `amount`, `status`, `id`, `txSender` FROM %s WHERE `id`=?", recorder.transferTableName),
_id.Hex(),
)

Expand All @@ -339,7 +346,8 @@ func (recorder *Recorder) Transfer(_id common.Hash) (*Transfer, error) {
var recipient string
var rawAmount string
var id sql.NullString
if err := row.Scan(&cashier, &token, &tx.index, &sender, &recipient, &rawAmount, &tx.status, &id); err != nil {
var txSender sql.NullString
if err := row.Scan(&cashier, &token, &tx.index, &sender, &recipient, &rawAmount, &tx.status, &id, &txSender); err != nil {
return nil, err
}

Expand All @@ -355,6 +363,9 @@ func (recorder *Recorder) Transfer(_id common.Hash) (*Transfer, error) {
if !ok || tx.amount.Sign() != 1 {
return nil, errors.Errorf("invalid amount %s", rawAmount)
}
if txSender.Valid {
tx.txSender = common.HexToAddress(txSender.String)
}
if toToken, ok := recorder.tokenPairs[tx.token]; ok {
tx.coToken = toToken
} else {
Expand Down
3 changes: 2 additions & 1 deletion witness-service/witness/tokencashierbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (tc *tokenCashierBase) PullTransfers(count uint16) error {
for _, transfer := range transfers {
if transfer.blockHeight <= confirmHeight {
if err := tc.recorder.UpsertTransfer(transfer); err != nil {
return errors.Wrap(err, "failed to add transfer")
return errors.Wrap(err, "failed to upsert transfer")
}
} else {
if err := tc.recorder.AddTransfer(transfer, TransferNew); err != nil {
Expand Down Expand Up @@ -225,6 +225,7 @@ func (tc *tokenCashierBase) SubmitTransfers(sign func(*Transfer, common.Address)
Recipient: transfer.recipient.Bytes(),
Amount: transfer.amount.String(),
Fee: transfer.fee.String(),
TxSender: transfer.txSender.Bytes(),
},
Address: witness.Bytes(),
Signature: signature,
Expand Down
5 changes: 5 additions & 0 deletions witness-service/witness/tokencashieronethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func NewTokenCashierOnEthereum(
case 0:
log.Printf("\tAmount %d is the same as real amount %d\n", amount, realAmount)
}
txSender, err := ethereumClient.TransactionSender(context.Background(), nil, transferLog.BlockHash, transferLog.TxIndex)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch transaction sender")
}
transfers = append(transfers, &Transfer{
cashier: transferLog.Address,
token: tokenAddress,
Expand All @@ -117,6 +121,7 @@ func NewTokenCashierOnEthereum(
fee: new(big.Int).SetBytes(transferLog.Data[96:128]),
blockHeight: transferLog.BlockNumber,
txHash: transferLog.TxHash,
txSender: txSender,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions witness-service/witness/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
timestamp time.Time
gas uint64
gasPrice *big.Int
txSender common.Address
}

// Service manages to exchange iotex coin to ERC20 token on ethereum
Expand Down

0 comments on commit 317ff40

Please sign in to comment.