Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use w3bstream request param format #54

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 169 additions & 60 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
Expand All @@ -20,13 +21,14 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/gin-gonic/gin"
"github.com/iotexproject/w3bstream/project"
wsapi "github.com/iotexproject/w3bstream/service/apinode/api"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shopspring/decimal"
"github.com/tidwall/gjson"
goproto "google.golang.org/protobuf/proto"

"github.com/iotexproject/pebble-server/contract/ioid"
"github.com/iotexproject/pebble-server/contract/ioidregistry"
"github.com/iotexproject/pebble-server/db"
"github.com/iotexproject/pebble-server/metrics"
"github.com/iotexproject/pebble-server/proto"
Expand Down Expand Up @@ -69,11 +71,15 @@ type receiveReq struct {
// Therefore, we’ll use only two codes: 200 for success and 400 for failure.
// Specific error details will be provided in the returned error message.
type httpServer struct {
engine *gin.Engine
db *db.DB
prv *ecdsa.PrivateKey
ioidInstance *ioid.Ioid
ioidRegistryInstance *ioidregistry.Ioidregistry
engine *gin.Engine
db *db.DB
prv *ecdsa.PrivateKey
}

var pebbleProject = project.Config{
SignedKeys: []project.SignedKey{{Name: "timestamp", Type: "uint64"}},
SignatureAlgorithm: "ecdsa",
HashAlgorithm: "sha256",
}

func (s *httpServer) pubkey(c *gin.Context) {
Expand Down Expand Up @@ -113,13 +119,9 @@ func (s *httpServer) query(c *gin.Context) {
return
}
if d == nil {
nd, err := s.ensureDevice(deviceAddr, req.DeviceID)
if err != nil {
slog.Error("failed to ensure device", "error", err, "device_id", req.DeviceID)
c.JSON(http.StatusBadRequest, newErrResp(err))
return
}
d = nd
slog.Error("the device has not been registered", "device_id", req.DeviceID)
c.JSON(http.StatusBadRequest, newErrResp(errors.New("the device has not been registered")))
return
}

metrics.TrackDeviceCount(req.DeviceID)
Expand Down Expand Up @@ -207,11 +209,9 @@ func (s *httpServer) receive(c *gin.Context) {
return
}
if d == nil {
if _, err := s.ensureDevice(deviceAddr, req.DeviceID); err != nil {
slog.Error("failed to ensure device", "error", err, "device_id", req.DeviceID)
c.JSON(http.StatusBadRequest, newErrResp(err))
return
}
slog.Error("the device has not been registered", "device_id", req.DeviceID)
c.JSON(http.StatusBadRequest, newErrResp(errors.New("the device has not been registered")))
return
}

metrics.TrackDeviceCount(req.DeviceID)
Expand Down Expand Up @@ -241,6 +241,145 @@ func (s *httpServer) receive(c *gin.Context) {
c.Status(http.StatusOK)
}

func (s *httpServer) receiveV2(c *gin.Context) {
req := &wsapi.CreateTaskReq{}
if err := c.ShouldBindJSON(req); err != nil {
slog.Error("failed to bind request", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid request payload")))
return
}
pid, ok := new(big.Int).SetString(req.ProjectID, 10)
if !ok {
slog.Error("failed to decode project id string", "project_id", req.ProjectID)
c.JSON(http.StatusBadRequest, newErrResp(errors.New("failed to decode project id string")))
return
}
sig, err := hexutil.Decode(req.Signature)
if err != nil {
slog.Error("failed to decode signature", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to decode signature")))
return
}
if ok := gjson.ValidBytes(req.Payload); !ok {
slog.Error("failed to validate payload in json format")
c.JSON(http.StatusBadRequest, newErrResp(errors.New("failed to validate payload in json format")))
return
}

recovered, _, _, err := recover(*req, &pebbleProject, sig)
if err != nil {
slog.Error("failed to recover public key", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid signature; could not recover public key")))
return
}
var device *db.Device
var approved bool
for _, r := range recovered {
slog.Info("recovered address", "project_id", pid.String(), "address", r.addr.String())
d, err := s.db.Device("did:io:" + r.addr.Hex())
if err != nil {
slog.Error("failed to query device", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to query device")))
return
}
if d != nil {
approved = true
device = d
break
}
}
if !approved {
slog.Error("device does not have permission", "project_id", pid.String())
c.JSON(http.StatusBadRequest, newErrResp(errors.New("device does not have permission")))
return
}

metrics.TrackDeviceCount(device.ID)
metrics.TrackRequestCount("post")
now := time.Now()
defer func() {
metrics.TrackRequestDuration("post", time.Since(now))
}()

payloadData := gjson.GetBytes(req.Payload, "data")
payload, err := hexutil.Decode(payloadData.String())
if err != nil {
slog.Error("failed to decode hex data", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to decode hex data")))
return
}
pkg, data, err := s.unmarshalPayload(payload)
if err != nil {
slog.Error("failed to unmarshal payload", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to unmarshal payload")))
return
}
if err := s.handle(device.ID, pkg, data); err != nil {
slog.Error("failed to handle payload data", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to handle payload data")))
return
}
c.Status(http.StatusOK)
}

func recover(req wsapi.CreateTaskReq, cfg *project.Config, sig []byte) (res []*struct {
addr common.Address
sig []byte
}, sigAlg, hashAlg string, err error) {
slog.Info("request json info", "signature", req.Signature)

req.Signature = ""
reqJson, err := json.Marshal(req)
if err != nil {
return nil, "", "", errors.Wrap(err, "failed to marshal request into json format")
}

var hash [32]byte
switch cfg.HashAlgorithm {
default:
hashAlg = "sha256"
h1 := sha256.Sum256(reqJson)
slog.Info("request json info", "data", string(reqJson), "hash", hexutil.Encode(h1[:]))
d := make([]byte, 0, len(h1))
d = append(d, h1[:]...)
slog.Info("request json info", "hash_d", hexutil.Encode(d))

for _, k := range cfg.SignedKeys {
value := gjson.GetBytes(req.Payload, k.Name)
switch k.Type {
case "uint64":
slog.Info("request json info", "json value uint64", value.Uint())
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, value.Uint()); err != nil {
return nil, "", "", errors.New("failed to convert uint64 to bytes array")
}
d = append(d, buf.Bytes()...)
}
}
slog.Info("request json info", "hash_d_final", hexutil.Encode(d))
hash = sha256.Sum256(d)
slog.Info("request json info", "hash_final", hexutil.Encode(hash[:]))
}

switch cfg.SignatureAlgorithm {
default:
sigAlg = "ecdsa"
rID := []uint8{0, 1}
for _, id := range rID {
ns := append(sig, byte(id))
pk, err := crypto.SigToPub(hash[:], ns)
if err != nil {
return nil, "", "", errors.Wrapf(err, "failed to recover public key from signature, recover_id %d", id)
}
res = append(res, &struct {
addr common.Address
sig []byte
}{addr: crypto.PubkeyToAddress(*pk), sig: ns})
}
return res, sigAlg, hashAlg, nil
}
}

func (s *httpServer) verifySignature(deviceAddr common.Address, sigStr string, o any) (bool, error) {
reqJson, err := json.Marshal(o)
if err != nil {
Expand Down Expand Up @@ -282,30 +421,6 @@ func (s *httpServer) recover(sig, h []byte) (common.Address, error) {
return crypto.PubkeyToAddress(*sigpk), nil
}

func (s *httpServer) ensureDevice(deviceAddr common.Address, deviceID string) (*db.Device, error) {
tokenID, err := s.ioidRegistryInstance.DeviceTokenId(nil, deviceAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to query device token id")
}
deviceOwner, err := s.ioidInstance.OwnerOf(nil, tokenID)
if err != nil {
return nil, errors.Wrap(err, "failed to query device owner")
}

dev := &db.Device{
ID: deviceID,
Owner: deviceOwner.String(),
Address: deviceAddr.String(),
Status: db.CONFIRM,
Proposer: deviceOwner.String(),
OperationTimes: db.NewOperationTimes(),
}
if err := s.db.UpsertDevice(dev); err != nil {
return nil, errors.Wrap(err, "failed to upsert device")
}
return dev, nil
}

func (s *httpServer) unmarshalPayload(payload []byte) (*proto.BinPackage, goproto.Message, error) {
pkg := &proto.BinPackage{}
if err := goproto.Unmarshal(payload, pkg); err != nil {
Expand Down Expand Up @@ -411,32 +526,26 @@ func (s *httpServer) handleSensor(id string, pkg *proto.BinPackage, data *proto.
Accelerometer: string(accelerometer),
OperationTimes: db.NewOperationTimes(),
}
err = s.db.CreateDeviceRecord(dr)
return errors.Wrapf(err, "failed to create senser data: %s", id)
if err := s.db.CreateDeviceRecord(dr); err != nil {
return errors.Wrapf(err, "failed to create senser data: %s", id)
}
return nil
}

func Run(db *db.DB, address string, client *ethclient.Client, prv *ecdsa.PrivateKey, ioidAddr, ioidRegistryAddr common.Address) error {
ioidInstance, err := ioid.NewIoid(ioidAddr, client)
if err != nil {
return errors.Wrap(err, "failed to new ioid contract instance")
}
ioidRegistryInstance, err := ioidregistry.NewIoidregistry(ioidRegistryAddr, client)
if err != nil {
return errors.Wrap(err, "failed to new ioid registry contract instance")
}
func Run(db *db.DB, address string, client *ethclient.Client, prv *ecdsa.PrivateKey) error {
s := &httpServer{
engine: gin.Default(),
db: db,
prv: prv,
ioidInstance: ioidInstance,
ioidRegistryInstance: ioidRegistryInstance,
engine: gin.Default(),
db: db,
prv: prv,
}

s.engine.GET("/metrics", gin.WrapH(promhttp.Handler()))
s.engine.GET("/public_key", s.pubkey)
s.engine.GET("/device", s.query)
s.engine.POST("/device", s.receive)
s.engine.GET("/v2/device", s.query)
s.engine.POST("/v2/device", s.receiveV2)

err = s.engine.Run(address)
err := s.engine.Run(address)
return errors.Wrap(err, "failed to start http server")
}
2 changes: 2 additions & 0 deletions cmd/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {
IoIDRegistryContractAddr string `env:"IOID_REGISTRY_CONTRACT_ADDRESS,optional"`
IoIDContractAddr string `env:"IOID_CONTRACT_ADDRESS,optional"`
ProjectContractAddr string `env:"PROJECT_CONTRACT_ADDRESS,optional"`
W3bstreamProjectID string `env:"W3BSTREAM_PROJECT_ID,optional"`
W3bstreamServiceEndpoint string `env:"W3BSTREAM_SERVICE_ENDPOINT,optional"`
env string `env:"-"`
}

Expand Down
10 changes: 7 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
log.Fatal(errors.Wrap(err, "failed to parse private key"))
}

db, err := db.New(cfg.DatabaseDSN, cfg.IoIDProjectID)
db, err := db.New(cfg.DatabaseDSN)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to new db"))
}
Expand All @@ -47,15 +47,19 @@ func main() {
UpsertScannedBlockNumber: db.UpsertScannedBlockNumber,
UpsertProjectMetadata: db.UpsertApp,
},
common.HexToAddress(cfg.ProjectContractAddr),
&monitor.ContractAddr{
Project: common.HexToAddress(cfg.ProjectContractAddr),
IoID: common.HexToAddress(cfg.IoIDContractAddr),
},
cfg.BeginningBlockNumber,
cfg.IoIDProjectID,
client,
); err != nil {
log.Fatal(errors.Wrap(err, "failed to run contract monitor"))
}

go func() {
if err := api.Run(db, cfg.ServiceEndpoint, client, prv, common.HexToAddress(cfg.IoIDContractAddr), common.HexToAddress(cfg.IoIDRegistryContractAddr)); err != nil {
if err := api.Run(db, cfg.ServiceEndpoint, client, prv); err != nil {
log.Fatal(err)
}
}()
Expand Down
4 changes: 0 additions & 4 deletions db/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ type firmwareData struct {
var pebbleFirmwareKey = crypto.Keccak256Hash([]byte("pebble_firmware"))

func (d *DB) UpsertApp(projectID uint64, key [32]byte, value []byte) error {
if d.ioidProjectID != projectID {
slog.Debug("not ioid project metadata", "project_id", projectID, "ioid_project_id", d.ioidProjectID)
return nil
}
if !bytes.Equal(key[:], pebbleFirmwareKey.Bytes()) {
slog.Error("failed to match pebble firmware key")
return nil
Expand Down
7 changes: 3 additions & 4 deletions db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
)

type DB struct {
ioidProjectID uint64
db *gorm.DB
db *gorm.DB
}

func New(dsn string, ioidProjectID uint64) (*DB, error) {
func New(dsn string) (*DB, error) {
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
Expand All @@ -33,5 +32,5 @@ func New(dsn string, ioidProjectID uint64) (*DB, error) {
); err != nil {
return nil, errors.Wrap(err, "failed to migrate model")
}
return &DB{ioidProjectID, db}, nil
return &DB{db}, nil
}
Loading
Loading