Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mixclient: Wait for runs to finish before closing client #3467

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 44 additions & 38 deletions mixing/mixclient/blame.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"math/big"
"sort"
"time"

"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
Expand All @@ -20,6 +21,8 @@ import (
"github.com/decred/dcrd/wire"
)

var errBlameFailed = errors.New("blame failed")

// blamedIdentities identifies detected misbehaving peers.
//
// If a run returns a blamedIdentities error, these peers are immediately
Expand Down Expand Up @@ -48,7 +51,7 @@ func (e blamedIdentities) String() string {
}

func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
c.logf("Blaming for sid=%x", sesRun.sid[:])
sesRun.logf("running blame assignment")

mp := c.mixpool
prs := sesRun.prs
Expand All @@ -65,14 +68,11 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
}
}()

err = c.sendLocalPeerMsgs(ctx, sesRun, true, func(p *peer) mixing.Message {
// Send initial secrets messages from any peers who detected
// misbehavior.
if !p.triggeredBlame {
return nil
}
return p.rs
})
deadline := time.Now().Add(timeoutDuration)

// Send initial secrets messages from any peers who detected
// misbehavior.
err = c.sendLocalPeerMsgs(ctx, deadline, sesRun, 0)
if err != nil {
return err
}
Expand All @@ -87,15 +87,17 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
rsHashes = append(rsHashes, rs.Hash())
}

// Send remaining secrets messages.
err = c.sendLocalPeerMsgs(ctx, sesRun, true, func(p *peer) mixing.Message {
if p.triggeredBlame {
p.triggeredBlame = false
return nil
// Send remaining secrets messages with observed RS hashes from the
// initial peers who published secrets.
c.forLocalPeers(ctx, sesRun, func(p *peer) error {
if !p.triggeredBlame {
if p.rs != nil {
p.rs.SeenSecrets = rsHashes
}
}
p.rs.SeenSecrets = rsHashes
return p.rs
return nil
})
err = c.sendLocalPeerMsgs(ctx, deadline, sesRun, msgRS)
if err != nil {
return err
}
Expand All @@ -113,14 +115,14 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
}
if len(rss) != len(sesRun.peers) {
// Blame peers who did not send secrets
c.logf("received %d RSs for %d peers; blaming unresponsive peers",
sesRun.logf("received %d RSs for %d peers; blaming unresponsive peers",
len(rss), len(sesRun.peers))

for _, p := range sesRun.peers {
if p.rs != nil {
continue
}
c.logf("blaming %x for RS timeout", p.id[:])
sesRun.logf("blaming %x for RS timeout", p.id[:])
blamed = append(blamed, *p.id)
}
return blamed
Expand All @@ -142,10 +144,14 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
continue
}
id := &rs.Identity
c.logf("blaming %x for false failure accusation", id[:])
sesRun.logf("blaming %x for false failure accusation", id[:])
blamed = append(blamed, *id)
}
err = blamed
if len(blamed) > 0 {
err = blamed
} else {
err = errBlameFailed
}
}()

defer c.mu.Unlock()
Expand All @@ -159,7 +165,7 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
KELoop:
for _, p := range sesRun.peers {
if p.ke == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -169,15 +175,15 @@ KELoop:
cm := p.rs.Commitment(c.blake256Hasher)
c.blake256HasherMu.Unlock()
if cm != p.ke.Commitment {
c.logf("blaming %x for false commitment, got %x want %x",
sesRun.logf("blaming %x for false commitment, got %x want %x",
p.id[:], cm[:], p.ke.Commitment[:])
blamed = append(blamed, *p.id)
continue
}

// Blame peers whose seed is not the correct length (will panic chacha20prng).
if len(p.rs.Seed) != chacha20prng.SeedSize {
c.logf("blaming %x for bad seed size in RS message", p.id[:])
sesRun.logf("blaming %x for bad seed size in RS message", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -187,7 +193,7 @@ KELoop:
if mixing.InField(scratch.SetBytes(m)) {
continue
}
c.logf("blaming %x for SR message outside field", p.id[:])
sesRun.logf("blaming %x for SR message outside field", p.id[:])
blamed = append(blamed, *p.id)
continue KELoop
}
Expand All @@ -199,7 +205,7 @@ KELoop:
// Recover derived key exchange from PRNG.
p.kx, err = mixing.NewKX(p.prng)
if err != nil {
c.logf("blaming %x for bad KX", p.id[:])
sesRun.logf("blaming %x for bad KX", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -210,14 +216,14 @@ KELoop:
case !bytes.Equal(p.ke.ECDH[:], p.kx.ECDHPublicKey.SerializeCompressed()):
fallthrough
case !bytes.Equal(p.ke.PQPK[:], p.kx.PQPublicKey[:]):
c.logf("blaming %x for KE public keys not derived from their PRNG",
sesRun.logf("blaming %x for KE public keys not derived from their PRNG",
p.id[:])
blamed = append(blamed, *p.id)
continue KELoop
}
publishedECDHPub, err := secp256k1.ParsePubKey(p.ke.ECDH[:])
if err != nil {
c.logf("blaming %x for unparsable pubkey")
sesRun.logf("blaming %x for unparsable pubkey")
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -229,7 +235,7 @@ KELoop:
start += mcount

if uint32(len(p.rs.SlotReserveMsgs)) != mcount || uint32(len(p.rs.DCNetMsgs)) != mcount {
c.logf("blaming %x for bad message count", p.id[:])
sesRun.logf("blaming %x for bad message count", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand Down Expand Up @@ -261,19 +267,19 @@ KELoop:
// from their PRNG.
for i, p := range sesRun.peers {
if p.ct == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
if len(recoveredCTs[i]) != len(p.ct.Ciphertexts) {
c.logf("blaming %x for different ciphertexts count %d != %d",
sesRun.logf("blaming %x for different ciphertexts count %d != %d",
p.id[:], len(recoveredCTs[i]), len(p.ct.Ciphertexts))
blamed = append(blamed, *p.id)
continue
}
for j := range p.ct.Ciphertexts {
if !bytes.Equal(p.ct.Ciphertexts[j][:], recoveredCTs[i][j][:]) {
c.logf("blaming %x for different ciphertexts", p.id[:])
sesRun.logf("blaming %x for different ciphertexts", p.id[:])
blamed = append(blamed, *p.id)
break
}
Expand All @@ -294,7 +300,7 @@ KELoop:
for _, pids := range shared {
if len(pids) > 1 {
for i := range pids {
c.logf("blaming %x for shared SR message", pids[i][:])
sesRun.logf("blaming %x for shared SR message", pids[i][:])
}
blamed = append(blamed, pids...)
}
Expand All @@ -306,7 +312,7 @@ KELoop:
SRLoop:
for i, p := range sesRun.peers {
if p.sr == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -325,7 +331,7 @@ SRLoop:
var decapErr *mixing.DecapsulateError
if errors.As(err, &decapErr) {
submittingID := p.id
c.logf("blaming %x for unrecoverable secrets", submittingID[:])
sesRun.logf("blaming %x for unrecoverable secrets", submittingID[:])
blamed = append(blamed, *submittingID)
continue
}
Expand All @@ -343,7 +349,7 @@ SRLoop:
// Blame when committed mix does not match provided.
for k := range srMix {
if srMix[k].Cmp(scratch.SetBytes(p.sr.DCMix[j][k])) != 0 {
c.logf("blaming %x for bad SR mix", p.id[:])
sesRun.logf("blaming %x for bad SR mix", p.id[:])
blamed = append(blamed, *p.id)
continue SRLoop
}
Expand Down Expand Up @@ -376,7 +382,7 @@ DCLoop:
// deferred function) if no peers could be assigned blame is
// not likely to be seen under this situation.
if p.dc == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -386,7 +392,7 @@ DCLoop:
// message, and there must be mcount DC-net vectors.
mcount := p.pr.MessageCount
if uint32(len(p.dc.DCNet)) != mcount {
c.logf("blaming %x for missing DC mix vectors", p.id[:])
sesRun.logf("blaming %x for missing DC mix vectors", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -406,7 +412,7 @@ DCLoop:
// Blame when committed mix does not match provided.
for k := 0; k < len(dcMix); k++ {
if !dcMix.Equals(mixing.Vec(p.dc.DCNet[j])) {
c.logf("blaming %x for bad DC mix", p.id[:])
sesRun.logf("blaming %x for bad DC mix", p.id[:])
blamed = append(blamed, *p.id)
continue DCLoop
}
Expand Down
Loading
Loading