Skip to content

Commit

Permalink
make batchConn more generic
Browse files Browse the repository at this point in the history
  • Loading branch information
zjx20 committed Apr 25, 2021
1 parent 7450e91 commit 95328de
Show file tree
Hide file tree
Showing 11 changed files with 452 additions and 225 deletions.
5 changes: 5 additions & 0 deletions batchconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ type batchConn interface {
WriteBatch(ms []ipv4.Message, flags int) (int, error)
ReadBatch(ms []ipv4.Message, flags int) (int, error)
}

type batchErrDetector interface {
ReadBatchUnavailable(err error) bool
WriteBatchUnavailable(err error) bool
}
28 changes: 28 additions & 0 deletions batchconn_generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// +build !linux

package kcp

import (
"net"
)

func toBatchConn(c net.PacketConn) batchConn {
if xconn, ok := c.(batchConn); ok {
return xconn
}
return nil
}

func readBatchUnavailable(xconn batchConn, err error) bool {
if detector, ok := xconn.(batchErrDetector); ok {
return detector.ReadBatchUnavailable(err)
}
return false
}

func writeBatchUnavailable(xconn batchConn, err error) bool {
if detector, ok := xconn.(batchErrDetector); ok {
return detector.WriteBatchUnavailable(err)
}
return false
}
80 changes: 80 additions & 0 deletions batchconn_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// +build linux

package kcp

import (
"net"
"os"

"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

func toBatchConn(c net.PacketConn) batchConn {
if xconn, ok := c.(batchConn); ok {
return xconn
}
if _, ok := c.(*net.UDPConn); ok {
var xconn batchConn
addr, err := net.ResolveUDPAddr("udp", c.LocalAddr().String())
if err == nil {
if addr.IP.To4() != nil {
xconn = ipv4.NewPacketConn(c)
} else {
xconn = ipv6.NewPacketConn(c)
}
}
return xconn
}
return nil
}

func isPacketConn(xconn batchConn) bool {
if _, ok := xconn.(*ipv4.PacketConn); ok {
return true
}
if _, ok := xconn.(*ipv6.PacketConn); ok {
return true
}
return false
}

func readBatchUnavailable(xconn batchConn, err error) bool {
if isPacketConn(xconn) {
// compatibility issue:
// for linux kernel<=2.6.32, support for sendmmsg is not available
// an error of type os.SyscallError will be returned
if operr, ok := err.(*net.OpError); ok {
if se, ok := operr.Err.(*os.SyscallError); ok {
if se.Syscall == "recvmmsg" {
return true
}
}
}
return false
}
if detector, ok := xconn.(batchErrDetector); ok {
return detector.ReadBatchUnavailable(err)
}
return false
}

func writeBatchUnavailable(xconn batchConn, err error) bool {
if isPacketConn(xconn) {
// compatibility issue:
// for linux kernel<=2.6.32, support for sendmmsg is not available
// an error of type os.SyscallError will be returned
if operr, ok := err.(*net.OpError); ok {
if se, ok := operr.Err.(*os.SyscallError); ok {
if se.Syscall == "sendmmsg" {
return true
}
}
}
return false
}
if detector, ok := xconn.(batchErrDetector); ok {
return detector.WriteBatchUnavailable(err)
}
return false
}
93 changes: 88 additions & 5 deletions readloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,42 @@ import (
"sync/atomic"

"github.com/pkg/errors"
"golang.org/x/net/ipv4"
)

func (s *UDPSession) readLoop() {
// default version
if s.xconn == nil {
s.defaultReadLoop()
return
}
s.batchReadLoop()
}

func (l *Listener) monitor() {
xconn := toBatchConn(l.conn)

// default version
if xconn == nil {
l.defaultMonitor()
return
}
l.batchMonitor(xconn)
}

func (s *UDPSession) defaultReadLoop() {
buf := make([]byte, mtuLimit)
var src string
for {
if n, addr, err := s.conn.ReadFrom(buf); err == nil {
// make sure the packet is from the same source
if src == "" { // set source address
src = addr.String()
} else if addr.String() != src {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
continue
if addr.String() != src {
if len(src) == 0 { // set source address
src = addr.String()
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
continue
}
}
s.packetInput(buf[:n])
} else {
Expand All @@ -37,3 +60,63 @@ func (l *Listener) defaultMonitor() {
}
}
}

func (s *UDPSession) batchReadLoop() {
// x/net version
var src string
msgs := make([]ipv4.Message, batchSize)
for k := range msgs {
msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)}
}

for {
if count, err := s.xconn.ReadBatch(msgs, 0); err == nil {
for i := 0; i < count; i++ {
msg := &msgs[i]
// make sure the packet is from the same source
if msg.Addr.String() != src {
if len(src) == 0 { // set source address if nil
src = msg.Addr.String()
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
continue
}
}

// source and size has validated
s.packetInput(msg.Buffers[0][:msg.N])
}
} else {
if readBatchUnavailable(s.xconn, err) {
s.defaultReadLoop()
return
}
s.notifyReadError(errors.WithStack(err))
return
}
}
}

func (l *Listener) batchMonitor(xconn batchConn) {
// x/net version
msgs := make([]ipv4.Message, batchSize)
for k := range msgs {
msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)}
}

for {
if count, err := xconn.ReadBatch(msgs, 0); err == nil {
for i := 0; i < count; i++ {
msg := &msgs[i]
l.packetInput(msg.Buffers[0][:msg.N], msg.Addr)
}
} else {
if readBatchUnavailable(xconn, err) {
l.defaultMonitor()
return
}
l.notifyReadError(errors.WithStack(err))
return
}
}
}
11 changes: 0 additions & 11 deletions readloop_generic.go

This file was deleted.

111 changes: 0 additions & 111 deletions readloop_linux.go

This file was deleted.

13 changes: 2 additions & 11 deletions sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,8 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.block = block
sess.recvbuf = make([]byte, mtuLimit)

// cast to writebatch conn
if _, ok := conn.(*net.UDPConn); ok {
addr, err := net.ResolveUDPAddr("udp", conn.LocalAddr().String())
if err == nil {
if addr.IP.To4() != nil {
sess.xconn = ipv4.NewPacketConn(conn)
} else {
sess.xconn = ipv6.NewPacketConn(conn)
}
}
}
// cast to batchConn, can be nil
sess.xconn = toBatchConn(conn)

// FEC codec initialization
sess.fecDecoder = newFECDecoder(dataShards, parityShards)
Expand Down
Loading

0 comments on commit 95328de

Please sign in to comment.