Skip to content

Commit

Permalink
Merge pull request #92 from KyberNetwork/promotees-list-v2
Browse files Browse the repository at this point in the history
intergrate service maintain promotees list to tradelog v2
  • Loading branch information
iostream1308 authored Nov 7, 2024
2 parents f4f62bc + c9689ea commit 20a0040
Show file tree
Hide file tree
Showing 23 changed files with 2,572 additions and 24 deletions.
Binary file added .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion internal/server/tradelogs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

var (
maxTimeRange uint64 = uint64(7 * 24 * time.Hour.Milliseconds())
maxTimeRange uint64 = uint64(24 * time.Hour.Milliseconds())
wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Expand Down
10 changes: 9 additions & 1 deletion v2/cmd/backfill/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/KyberNetwork/tradelogs/v2/pkg/kafka"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser/zxotc"
"github.com/KyberNetwork/tradelogs/v2/pkg/promotionparser"
promotion1inchv2 "github.com/KyberNetwork/tradelogs/v2/pkg/promotionparser/oneinchv2"
"github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill"
promoteeTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/promotees"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/state"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs"
storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types"
Expand Down Expand Up @@ -65,6 +68,9 @@ func run(c *cli.Context) error {
}
manager := tradelogs.NewManager(l, storages)

//promotee storage
promoteeStorage := promoteeTypes.New(l, db)

// backfill storage
backfillStorage := backfill.New(l, db)

Expand Down Expand Up @@ -100,6 +106,8 @@ func run(c *cli.Context) error {
//zxrfqv3.MustNewParserWithDeployer(traceCalls, ethClient, common.HexToAddress(parser.Deployer0xV3)),
}

promotionParsers := []promotionparser.Parser{promotion1inchv2.MustNewParser()}

// kafka broadcast topic
broadcastTopic := c.String(libapp.KafkaBroadcastTopic.Name)
err = kafka.ValidateTopicName(broadcastTopic)
Expand All @@ -114,7 +122,7 @@ func run(c *cli.Context) error {
}

// trade log handler
tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher)
tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, promoteeStorage, parsers, promotionParsers, broadcastTopic, kafkaPublisher)

// parse log worker
w := worker.NewBackFiller(tradeLogHandler, backfillStorage, stateStorage, l, rpcNode, parsers)
Expand Down
153 changes: 153 additions & 0 deletions v2/cmd/migrations/00005_add_promotees.up.sql

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion v2/cmd/parse_log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"github.com/KyberNetwork/tradelogs/v2/pkg/kafka"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser/zxotc"
"github.com/KyberNetwork/tradelogs/v2/pkg/promotionparser"
promotion1inchv2 "github.com/KyberNetwork/tradelogs/v2/pkg/promotionparser/oneinchv2"
"github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode"
promoteeTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/promotees"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/state"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs"
storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types"
Expand Down Expand Up @@ -63,6 +66,10 @@ func run(c *cli.Context) error {
manager := tradelogs.NewManager(l, []storageTypes.Storage{
zxotcStorage.New(l, db),
})

//promotee storage
promoteeStorage := promoteeTypes.New(l, db)

// state storage
s := state.New(l, db)

Expand Down Expand Up @@ -95,6 +102,8 @@ func run(c *cli.Context) error {
//zxrfqv3.MustNewParserWithDeployer(traceCalls, ethClient, common.HexToAddress(parser.Deployer0xV3)),
}

promotionParsers := []promotionparser.Parser{promotion1inchv2.MustNewParser()}

// kafka broadcast topic
broadcastTopic := c.String(libapp.KafkaBroadcastTopic.Name)
err = kafka.ValidateTopicName(broadcastTopic)
Expand All @@ -109,7 +118,7 @@ func run(c *cli.Context) error {
}

// trade log handler
tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher)
tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, promoteeStorage, parsers, promotionParsers, broadcastTopic, kafkaPublisher)

// parse log worker
w := worker.NewParseLog(tradeLogHandler, s, l)
Expand Down
75 changes: 75 additions & 0 deletions v2/cmd/promotees/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"fmt"
"log"
"os"

promoteeServer "github.com/KyberNetwork/tradelogs/v2/internal/server/promotees"
libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app"
promoteeTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/promotees"
"github.com/KyberNetwork/tradinglib/pkg/dbutil"
"github.com/jmoiron/sqlx"
"github.com/urfave/cli"
"go.uber.org/zap"
)

func main() {
app := libapp.NewApp()
app.Name = "promotees service"
app.Action = run

app.Flags = append(app.Flags, libapp.PostgresSQLFlags("tradelogs_v2")...)
app.Flags = append(app.Flags, libapp.HTTPServerFlags()...)

if err := app.Run(os.Args); err != nil {
log.Panic(err)
}
}

func run(c *cli.Context) error {
logger, _, flush, err := libapp.NewLogger(c)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

defer flush()

zap.ReplaceGlobals(logger)
l := logger.Sugar()
l.Infow("Starting backfill service")

db, err := initDB(c)
l.Infow("init db successfully")
if err != nil {
return fmt.Errorf("cannot init DB: %w", err)
}

//promotion storage
promoteeStorage := promoteeTypes.New(l, db)

s := promoteeServer.New(promoteeStorage, c.String(libapp.HTTPPromoteeServerFlag.Name))

return s.Run()
}

func initDB(c *cli.Context) (*sqlx.DB, error) {
db, err := libapp.NewDB(map[string]interface{}{
"host": c.String(libapp.PostgresHost.Name),
"port": c.Int(libapp.PostgresPort.Name),
"user": c.String(libapp.PostgresUser.Name),
"password": c.String(libapp.PostgresPassword.Name),
"dbname": c.String(libapp.PostgresDatabase.Name),
"sslmode": "disable",
})
if err != nil {
return nil, err
}

_, err = dbutil.RunMigrationUp(db.DB, c.String(libapp.PostgresMigrationPath.Name),
c.String(libapp.PostgresDatabase.Name))
if err != nil {
return nil, err
}
return db, nil
}
116 changes: 116 additions & 0 deletions v2/internal/server/promotees/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package server

import (
"fmt"
"net/http"
"strings"

promoteeTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/promotees"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
)

type Server struct {
r *gin.Engine
bindAddr string
s *promoteeTypes.Storage
}

func New(s *promoteeTypes.Storage, bindAddr string) *Server {
engine := gin.New()
engine.Use(gin.Recovery())

server := &Server{
r: engine,
bindAddr: bindAddr,
s: s,
}

gin.SetMode(gin.DebugMode)
server.register()

return server
}

func (s *Server) Run() error {
if err := s.r.Run(s.bindAddr); err != nil {
return fmt.Errorf("run server: %w", err)
}

return nil
}

func (s *Server) register() {
pprof.Register(s.r, "/debug")
s.r.GET("/promotees", s.getPromotees)
s.r.POST("/insert_name", s.insertName)
}

func responseErr(c *gin.Context, status int, err error) {
c.JSON(http.StatusBadRequest, gin.H{
"success": false,
"error": err.Error(),
"status": status,
})
}

func (s *Server) getPromotees(c *gin.Context) {
var (
query promoteeTypes.PromoteesQuery
err = c.ShouldBind(&query)
)
if err != nil {
responseErr(c, http.StatusBadRequest, err)
return
}
if query.Promotee != "" {
query.Promotee = strings.ToLower(query.Promotee)
}
if query.Promoter != "" {
query.Promoter = strings.ToLower(query.Promoter)
}
data, err := s.s.Get(query)
if err != nil {
responseErr(c, http.StatusBadRequest, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"data": data,
})
}

func (s *Server) insertName(c *gin.Context) {
var queries []promoteeTypes.PromoteesQuery

if err := c.ShouldBindJSON(&queries); err != nil {
responseErr(c, http.StatusBadRequest, err)
return
}

promotees := make([]promoteeTypes.Promotee, len(queries))
for i, query := range queries {
if query.Name == "" {
responseErr(c, http.StatusBadRequest, fmt.Errorf("missing field 'Name' in query index %d", i))
return
}
if query.Promoter == "" {
responseErr(c, http.StatusBadRequest, fmt.Errorf("missing field 'Promoter' in query index %d", i))
return
}
promotees[i] = promoteeTypes.Promotee{
Promoter: strings.ToLower(query.Promoter),
Name: query.Name,
}
}

if err := s.s.InsertPromoterName(promotees); err != nil {
responseErr(c, http.StatusInternalServerError, err)
return
}

c.JSON(http.StatusOK, gin.H{
"success": true,
"data": promotees,
})
}
2 changes: 1 addition & 1 deletion v2/internal/server/tradelogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var (
maxTimeRange = uint64(7 * 24 * time.Hour.Milliseconds())
maxTimeRange = uint64(24 * time.Hour.Milliseconds())
)

type TradeLogs struct {
Expand Down
7 changes: 7 additions & 0 deletions v2/pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ var (
EnvVar: "TRADELOGS_SERVER_ADDRESS",
Value: "localhost:8080",
}
HTTPPromoteeServerFlag = cli.StringFlag{
Name: "promotee-server-address",
Usage: "Run the rest for promotees server",
EnvVar: "PROMOTEE_SERVER_ADDRESS",
Value: "localhost:8083",
}
)

func HTTPServerFlags() []cli.Flag {
return []cli.Flag{
HTTPBackfillServerFlag,
HTTPBroadcastServerFlag,
HTTPTradeLogsServerFlag,
HTTPPromoteeServerFlag,
}
}
6 changes: 6 additions & 0 deletions v2/pkg/constant/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ const (
Deployer0xV3 = "0x00000000000004533Fe15556B1E086BB1A72cEae"

TableZeroX = "tradelogs_zerox"

Promotion1InchV2 = "oneinchv2"
Promotion1InchV1 = "oneinchv1"

AddrPr1InchV2 = "0xf55684bc536487394b423e70567413fab8e45e26"
AddrPr1InchV1 = "0xcb8308fcb7bc2f84ed1bea2c016991d34de5cc77"
)
Loading

0 comments on commit 20a0040

Please sign in to comment.