Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BLS example #4

Merged
merged 8 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ jobs:
go-version: '1.20'

- name: Local DON Tests
run: cd examples/don && go test -v -race .
run: cd examples/don && go test -v -timeout=2m .

- name: BLS Tests
run: cd examples/bls && go test -v -timeout=2m -tags blst_enabled .
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ test:
test-localdon:
@make TEST_PKG=./examples/don/... test-pkg

test-bls:
@go test -v -tags blst_enabled -timeout=${TEST_TIMEOUT} ./examples/bls/...

test-pkg:
@go test -v -race -timeout=${TEST_TIMEOUT} ${TEST_PKG}

Expand Down
14 changes: 14 additions & 0 deletions examples/bls/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# BLS example

This package shows a high level integration with BLS based networks.

## Overview

Every internal network is represented by a BLS key, which is splitted into shares and distributed among the nodes in that network.
Internal messages are signed with the shares of nodes, once a threshold of signatures is reached, the nodes of that network reconstruct the signature and broadcast it to the other networks. Nodes in other networks verifies the messages using the public key of the BLS key of the producing network.

## Running the example

```shell
make test-bls
```
285 changes: 285 additions & 0 deletions examples/bls/bls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package blstest

import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"

"github.com/herumi/bls-eth-go-binary/bls"
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

grpcapi "github.com/amirylm/p2pmq/api/grpc"
"github.com/amirylm/p2pmq/api/grpc/client"
"github.com/amirylm/p2pmq/core"
)

func TestBls(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

n := 10

netCfg := map[string]netConfig{
"auto": {
nodes: 4,
subscribed: []string{
"func",
"mercury",
},
reportsInterval: time.Second * 1,
},
"func": {
nodes: 4,
subscribed: []string{
"auto",
"mercury",
},
reportsInterval: time.Second * 1,
},
"mercury": {
nodes: 4,
subscribed: []string{
"test",
},
reportsInterval: time.Second * 2,
},
"transmit": {
nodes: 4,
subscribed: []string{
"func",
"mercury",
},
reportsInterval: time.Second * 4,
},
"test": {
nodes: 4,
subscribed: []string{
"auto",
"func",
"mercury",
},
reportsInterval: time.Second * 5,
},
}

require.NoError(t, logging.SetLogLevelRegex("p2pmq", "debug"))

controllers, _, _, done := core.SetupTestControllers(ctx, t, n, func(*pubsub.Message) {}, func(peer.ID, *pubsub.Message) pubsub.ValidationResult {
return pubsub.ValidationIgnore
})
defer done()
require.Equal(t, n, len(controllers))

grpcServers := make([]*grpc.Server, n)
for i := 0; i < n; i++ {
ctrl := controllers[i]
control, msgR, valR := grpcapi.NewServices(ctrl, 128)
ctrl.RefreshRouters(func(mw *core.MsgWrapper[error]) {
require.NoError(t, msgR.Push(mw))
}, func(mw *core.MsgWrapper[pubsub.ValidationResult]) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
mw.Result = valR.PushWait(ctx, mw)
})
grpcServers[i] = grpcapi.NewGrpcServer(control, msgR, valR)
}

pubstore := NewStore[*bls.PublicKey]()
blsKeys := make([]*bls.SecretKey, 0)
for net := range netCfg {
netPriv, netPub := GenBlsKey()
pubstore.Add(net, netPub)
blsKeys = append(blsKeys, netPriv)
}

t.Log("Starting grpc servers")
addrs := make([]string, n)
nodes := make([]*Node, n)
for i, s := range grpcServers {
{
srv := s
port := randPort()
addrs[i] = fmt.Sprintf(":%d", port)
nodes[i] = NewNode(client.GrpcEndPoint(fmt.Sprintf(":%d", port)), pubstore)
go func() {
err := grpcapi.ListenGrpc(srv, port)
if ctx.Err() == nil {
require.NoError(t, err)
}
}()
}
}
defer func() {
for _, s := range grpcServers {
s.Stop()
}
t.Log("grpc servers stopped")
}()

t.Log("Starting nodes")
for _, n := range nodes {
n.Start()
}
defer func() {
for _, n := range nodes {
n.Close()
}
}()

netInstances := make(map[string][]*Node)
j := 0
for net, cfg := range netCfg {
sks, pks, err := GenShares(blsKeys[j], uint64(Threshold(cfg.nodes)), uint64(cfg.nodes))
require.NoError(t, err)
nodes := getRandomNodes(cfg.nodes, nodes)
netInstances[net] = nodes
for i, n := range nodes {
n.Shares.Add(net, Share{
Signers: pks,
SignerID: uint64(i + 1),
PrivateKey: sks[uint64(i+1)],
SharePublicKey: blsKeys[j].GetPublicKey(),
})
require.NoError(t, n.consumer.Subscribe(ctx, net))
for _, sub := range cfg.subscribed {
require.NoError(t, n.consumer.Subscribe(ctx, sub))
}
}
j++
}

t.Log("Nodes subscribed")

<-time.After(time.Second * 5) // TODO: avoid timeout

t.Log("Starting reports generation")

reports := NewReportBuffer(reportBufferSize)
for net, cfg := range netCfg {
nodes := netInstances[net]
go triggerReports(ctx, t, net, cfg.reportsInterval, reports, nodes)
}

testDuration := time.Second * 10
expectedReports := map[string]int{
"auto": int(testDuration) / int(netCfg["auto"].reportsInterval),
"func": int(testDuration) / int(netCfg["func"].reportsInterval),
"mercury": int(testDuration) / int(netCfg["mercury"].reportsInterval),
"transmit": int(testDuration) / int(netCfg["transmit"].reportsInterval),
"test": int(testDuration) / int(netCfg["test"].reportsInterval),
}

checkLoop:
for ctx.Err() == nil {
time.Sleep(testDuration / 4)
for did, exp := range expectedReports {
reportsCount := len(reports.GetAll(did))
for reportsCount+1 < exp && ctx.Err() == nil {
time.Sleep(time.Second)
reportsCount = len(reports.GetAll(did))
}
if ctx.Err() == nil {
t.Logf("DON %s reports count: %d", did, expectedReports[did])
// we have enough reports for this don
expectedReports[did] = 0
}
}
for _, exp := range expectedReports {
if exp != 0 {
continue checkLoop
}
}
break
}

<-time.After(testDuration / 4)

for did, exp := range expectedReports {
require.Equal(t, 0, exp, "DON %s reports count", did)
}
}

func triggerReports(pctx context.Context, t *testing.T, net string, interval time.Duration, reports *ReportBuffer, nodes []*Node) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-pctx.Done():
return
case <-ticker.C:
latest := reports.getLatest(net)
var nextSeq uint64
if latest != nil {
nextSeq = latest.SeqNumber + 1
} else {
nextSeq = 1
}
t.Logf("Generating report for %s, seq %d", net, nextSeq)
if pctx.Err() != nil {
return
}
for _, n := range nodes {
node := n
share, ok := node.Shares.Get(net)
if !ok {
continue
}
if node.getLeader(net, nextSeq) == share.SignerID {
node.threadC.Go(func(ctx context.Context) {
report := &SignedReport{
Network: net,
SeqNumber: nextSeq,
Data: []byte(fmt.Sprintf("report for %s, seq %d", net, nextSeq)),
}
share.Sign(report)
if pctx.Err() != nil || ctx.Err() != nil { // ctx might be canceled by the time we get here
return
}
if err := node.Broadcast(ctx, *report); ctx.Err() == nil && pctx.Err() == nil {
if err != nil && strings.Contains(err.Error(), "context canceled") {
return
}
require.NoError(t, err)
reports.Add(net, *report)
}
})
}
}
}
}
}

type netConfig struct {
nodes int
reportsInterval time.Duration
subscribed []string
}

func randPort() int {
return 5001 + rand.Intn(3000) + rand.Intn(2000)
}

func getRandomNodes(n int, items []*Node) []*Node {
if n > len(items) {
n = len(items)
}
visited := map[int]bool{}
randoms := make([]*Node, 0)
for len(randoms) < n {
r := rand.Intn(len(items))
if visited[r] {
continue
}
visited[r] = true
randoms = append(randoms, items[r])
}
return randoms
}
63 changes: 63 additions & 0 deletions examples/bls/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package blstest

import (
"fmt"
"math/big"

"github.com/herumi/bls-eth-go-binary/bls"
)

var (
curveOrder = new(big.Int)
)

// init initializes BLS
func init() {
_ = bls.Init(bls.BLS12_381)
_ = bls.SetETHmode(bls.EthModeDraft07)

curveOrder, _ = curveOrder.SetString(bls.GetCurveOrder(), 10)
}

func GenBlsKey() (*bls.SecretKey, *bls.PublicKey) {
sk := &bls.SecretKey{}
sk.SetByCSPRNG()

return sk, sk.GetPublicKey()
}

// GenShares receives a bls.SecretKey and desired count.
// Will split the secret key into count shares
func GenShares(sk *bls.SecretKey, threshold uint64, count uint64) (map[uint64]*bls.SecretKey, map[uint64]*bls.PublicKey, error) {
msk := make([]bls.SecretKey, threshold)
// master key
msk[0] = *sk

// construct poly
for i := uint64(1); i < threshold; i++ {
sk, _ := GenBlsKey()
msk[i] = *sk
}

// evaluate shares - starting from 1 because 0 is master key
shares := make(map[uint64]*bls.SecretKey)
sharesPK := make(map[uint64]*bls.PublicKey, 0)
for i := uint64(1); i <= count; i++ {
blsID := bls.ID{}

err := blsID.SetDecString(fmt.Sprintf("%d", i))
if err != nil {
return nil, nil, err
}

sk := bls.SecretKey{}
err = sk.Set(msk, &blsID)
if err != nil {
return nil, nil, err
}
shares[i] = &sk
sharesPK[i] = sk.GetPublicKey()
}

return shares, sharesPK, nil
}
Loading