Skip to content

Commit

Permalink
rework pings
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Sep 12, 2024
1 parent 0df3f0e commit 184de1f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
5 changes: 1 addition & 4 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,10 +548,7 @@ func (cho *Chotki) Drain(ctx context.Context, recs protocol.Records) (err error)
case 'B': // bye dear
cho.log.InfoCtx(ctx, "received session end", "id", id.String())
cho.syncs.Delete(id)
case 'A':
cho.log.InfoCtx(ctx, "received ping")
case 'Z':
cho.log.InfoCtx(ctx, "received pong")
case 'P': // ping noop
default:
return fmt.Errorf("unsupported packet type %c", lit)
}
Expand Down
2 changes: 0 additions & 2 deletions chotki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,6 @@ func TestChotki_SyncLivePingsOk(t *testing.T) {

assert.Equal(t, SendLive, synca.GetFeedState())
assert.Equal(t, SendLive, syncb.GetFeedState())
assert.Equal(t, SendLive, synca.GetDrainState())
assert.Equal(t, SendLive, syncb.GetDrainState())
cancel()
// wait until everything stopped
time.Sleep(time.Millisecond * 100)
Expand Down
31 changes: 23 additions & 8 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (m *SyncMode) Unzip(raw []byte) error {
return nil
}

const PingVal = "ping"
const PongVal = "pong"

type SyncState int

const (
Expand Down Expand Up @@ -203,7 +206,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
}
case SendPing:
recs = protocol.Records{
protocol.Record('A', rdx.Stlv("ping")),
protocol.Record('P', rdx.Stlv(PingVal)),
}
sync.SetFeedState(ctx, SendLive)
sync.pingStage.Store(int32(Inactive))
Expand All @@ -214,7 +217,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
})
case SendPong:
recs = protocol.Records{
protocol.Record('Z', rdx.Stlv("pong")),
protocol.Record('P', rdx.Stlv(PongVal)),
}
sync.pingStage.Store(int32(Inactive))
sync.SetFeedState(ctx, SendLive)
Expand Down Expand Up @@ -429,11 +432,29 @@ func (sync *Syncer) resetPingTimer() {
})
}

func (sync *Syncer) processPings(recs protocol.Records) {
for _, rec := range recs {
if protocol.Lit(rec) == 'P' {
body, _ := protocol.Take('P', rec)
switch rdx.Snative(body) {
case PingVal:
sync.log.Info("ping received", sync.withDefaultArgs()...)
// go to pong state next time
sync.pingStage.Store(int32(Pong))
case PongVal:
sync.log.Info("pong received", sync.withDefaultArgs()...)
}
}
}
}

func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error) {
if len(recs) == 0 {
return nil
}

sync.processPings(recs)

switch sync.drainState {
case SendHandshake:
if len(recs) == 0 {
Expand Down Expand Up @@ -462,9 +483,6 @@ func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error
} else {
sync.SetDrainState(ctx, SendLive)
}
if lit == 'A' {
sync.pingStage.Store(int32(Pong))
}
}
if sync.Mode&SyncLive != 0 {
sync.resetPingTimer()
Expand All @@ -480,9 +498,6 @@ func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error
if lit == 'B' {
sync.SetDrainState(ctx, SendNone)
}
if lit == 'A' {
sync.pingStage.Store(int32(Pong))
}
err = sync.Host.Drain(sync.log.WithDefaultArgs(ctx, sync.withDefaultArgs()...), recs)
if err == nil {
sync.Host.Broadcast(sync.log.WithDefaultArgs(ctx, sync.withDefaultArgs()...), recs, sync.Name)
Expand Down

0 comments on commit 184de1f

Please sign in to comment.