Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat: io_uring for netpoll I/O poller #197

Open
wants to merge 67 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
d766619
feat: add variable parameters to manager.Run() for PollType of manage…
Aug 24, 2022
0227947
feat: re-design openPoll() to select one PollType
Aug 24, 2022
bb5ef53
feat: add IOURingPoll (WIP)
Aug 25, 2022
16074e4
feat: add sysMmap & sysMunmap
Aug 25, 2022
00ee563
fix: rename io_uring to uring
Aug 27, 2022
04cd494
fix: uniform variable r to u
Aug 29, 2022
a940bfa
feat: add const for mmap
Aug 29, 2022
a375d8c
feat: add setup, enter & register for system call
Aug 29, 2022
3e90d2b
feat: add setup options
Aug 29, 2022
8db732f
feat: add SQEntry & CQEvent
Aug 29, 2022
ea33f82
feat: add atomic operation for barrier
Aug 29, 2022
fda4b12
feat: add probe supported capability
Aug 29, 2022
7f9768b
feat: add advance usage for register
Aug 29, 2022
fdc0e5b
feat: add uring for low-level interface
Aug 29, 2022
4c5b712
feat: add submission operations
Aug 29, 2022
641d782
feat: add completion operations
Aug 29, 2022
8e81931
fix: restructure URingSQE & URingCQE
Sep 7, 2022
1057583
fix: wrap mmap & unmmap, recovery syscall.MAP_POPULATE
Sep 7, 2022
a02b287
faet: public getOp by Op
Sep 7, 2022
58d195d
fix: update sysRegister to SysRegister
Sep 7, 2022
4fab811
feat: public syscall
Sep 7, 2022
e3f4b83
feat: restructure uring methord
Sep 7, 2022
392c003
fix: remove sys_barrier.go
Sep 7, 2022
5ad8bd6
fix: Copyright 2022 CloudWeGo Authors
Sep 7, 2022
61f407b
fix: rollback poll_default_* & poll_manager
Sep 8, 2022
c82d419
feat: restructure URingCQE, Error & rename setData to setUserData
Sep 14, 2022
6e445f8
fix: cal size
Sep 14, 2022
2e064d6
feat: add sys_operation
Sep 14, 2022
18aac13
feat: const _size* & fix Sys*
Sep 14, 2022
546b5cd
fix: correct methods
Sep 14, 2022
4386f8c
fix: restructure submit*
Sep 14, 2022
28cc8eb
feat: add Queue & fix others
Sep 14, 2022
a4ffc46
feat: add test-coverage at 65.6% with bad TestTimeoutWait
Sep 14, 2022
3816272
fix: correct import
Sep 14, 2022
dbdf554
fix: add timeout check for WaitCQEs
Sep 21, 2022
f95b7f6
fix: simplify Syscall6 for SysEnter
Sep 21, 2022
b6334f4
feat: add acceptOp
Sep 21, 2022
72137f0
fix: rename OpCode to OpFlag
Sep 21, 2022
32a14c0
feat: add cat example
Sep 21, 2022
53df5fc
feat: add server example
Sep 21, 2022
f86c99a
Merge branch 'cloudwego:develop' into feat/io_uring
Oct 10, 2022
cdbc94f
fix: rename cq.kRingMsk to cq.kRingMask, add annotation
Oct 12, 2022
c1ec061
feat: update peekBatchCQE & peekCQE, and add getEvents
Oct 12, 2022
6a4d7e2
fix: rename cq.kRingMsk to cq.kRingMask
Oct 12, 2022
e9ff6a3
feat: add benckmark for uring & epoll
Oct 12, 2022
7c65c95
Merge branch 'feat/io_uring' of github.com:Jacob953/netpoll into feat…
Oct 12, 2022
46ca4cf
feat: add memory_barrier
Oct 12, 2022
83cc3e2
fix: TestTimeoutWait not supported
Oct 13, 2022
93dda0b
fix: update SMP_MEMORY_BARRIER
Oct 13, 2022
625c0c4
feat: implement netpoll poller register
Oct 13, 2022
0013936
fix: rm poll_io_uring.go for restructuring
Oct 13, 2022
302e5be
fix: rm SMP_SQRING.Store() at SysSetup
Oct 13, 2022
03248a3
fix: openPoll segmentation violation
Oct 13, 2022
5e0b446
fix: simplify
Oct 14, 2022
4d8abb9
feat: modify OpFlag
Oct 14, 2022
ac1a9d8
feat: add RegisterURingPoll
Oct 14, 2022
e1a8711
feat: update pollRegister
Oct 14, 2022
9dbdc9d
feat: update go version to 1.17 for unsafe.Add
Oct 14, 2022
d2b3966
feat: add register for event, rm opflag
Oct 15, 2022
5a6033f
feat: add PollAdd & PollRemove
Oct 15, 2022
4608460
feat: add uring poller
Oct 15, 2022
0737179
fix: rm fmt(for test)
Oct 17, 2022
ef05b54
feat: add URingEpollCtl
Oct 17, 2022
326225e
fix: restructure uringpoll
Oct 17, 2022
861cfa8
fix: check trig and exit first
Oct 17, 2022
dbd117a
fix: add PollAdd for listen
Oct 17, 2022
cfc5b9a
feat: restructure Control
Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"unsafe"
)

func openPoll() Poll {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_xxx 这些先删掉吧,这里需要开发注册机制,不通过 type 区分

func openPoll(pollType PollType) Poll {
if pollType == PollIOURing {
return openIOURingPoll()
}
return openDefaultPoll()
}

Expand Down
5 changes: 4 additions & 1 deletion poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
)

// Includes defaultPoll/multiPoll/uringPoll...
func openPoll() Poll {
func openPoll(pollType PollType) Poll {
if pollType == PollIOURing {
return openIOURingPoll()
}
return openDefaultPoll()
}

Expand Down
45 changes: 45 additions & 0 deletions poll_io_uring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 CloudWeGo Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright 2022 CloudWeGo Authors

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package netpoll

import uring "github.com/cloudwego/netpoll/io_uring"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

路径错误


// TODO: init uringPoll
func openIOURingPoll() *uringPoll {
poll := new(uringPoll)
ring, err := uring.IOURing(0)
if err != nil {
panic(err)
}
poll.fd = ring.Fd()
return poll
}

// TODO: build uringPoll
type uringPoll struct {
fd int
}

// TODO: Wait implements Poll.
func (p *uringPoll) Wait() error

// TODO: Close implements Poll.
func (p *uringPoll) Close() error

// TODO: Trigger implements Poll.
func (p *uringPoll) Trigger() error

// TODO: Control implements Poll.
func (p *uringPoll) Control(operator *FDOperator, event PollEvent) error
22 changes: 20 additions & 2 deletions poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,16 @@ func (m *manager) Close() error {
}

// Run all pollers.
func (m *manager) Run() error {
func (m *manager) Run(pollTypes ...PollType) error {
// set PollDefault as type of poll
pollType := PollDefault
// set poll type, only executed if the parameter is unique
if len(pollTypes) == 1 {
pollType = pollTypes[0]
}
// new poll to fill delta.
for idx := len(m.polls); idx < m.NumLoops; idx++ {
var poll = openPoll()
var poll = openPoll(pollType)
m.polls = append(m.polls, poll)
go poll.Wait()
}
Expand All @@ -123,3 +129,15 @@ func (m *manager) Reset() error {
func (m *manager) Pick() Poll {
return m.balance.Pick()
}

// PollType defines the type of manager.polls.
type PollType int

const (
// PollDefault is used to set poll as epoll on linux systems by default,
// and kevent by default on bsd systems.
PollDefault PollType = 0x1

// PollIOURing is used to set poll as io_uring.
PollIOURing PollType = 0x2
)
33 changes: 33 additions & 0 deletions uring/sys_barrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package uring

import "sync/atomic"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个文件删了吧

func WRITE_ONCE_U32(p *uint32, v uint32) {
atomic.StoreUint32(p, v)
}

func READ_ONCE_U32(p *uint32) uint32 {
return atomic.LoadUint32(p)
}

func SMP_STORE_RELEASE_U32(p *uint32, v uint32) {
atomic.StoreUint32(p, v)
}

func SMP_LOAD_ACQUIRE_U32(p *uint32) uint32 {
return atomic.LoadUint32(p)
}
146 changes: 146 additions & 0 deletions uring/sys_enter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2021 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package uring

import (
"syscall"
)

// Completion Queue Eveny, IO completion data structure
type URingCQE struct {
UserData uint64 // sqe->data submission passed back
Res int32 // result code for this event
Flags uint32

// If the ring is initialized with IORING_SETUP_CQE32, then this field
// contains 16-bytes of padding, doubling the size of the CQE.
BigCQE []uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

16bytes 应该是 [2]uint64 吧,这样定义怎么初始化呢

}

// Error implements CQE
func (c *URingCQE) Error() error {
return syscall.Errno(uintptr(-c.Res))
}

// getData implements CQE
func (c *URingCQE) getData() uint64 {
return c.UserData
}

// Submission Queue Entry, IO submission data structure
type URingSQE struct {
OpCode uint8 // type of operation for this sqe
Flags uint8 // IOSQE_ flags
IOPrio uint16 // ioprio for the request
Fd int32 // file descriptor to do IO on
Off uint64 // offset into file
Addr uint64 // pointer to buffer or iovecs
Len uint32 // buffer size or number of iovecs
OpcodeFlags uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is UnionFlags

UserData uint64 // data to be passed back at completion time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

以下先 pad [3]uint64 吧,这里 union 先不区分

BufIG uint16

Personality uint16 // personality to use, if used
SpliceFdIn int32
_pad2 [2]uint64
}

// setData sets the user data field of the SQE instance passed in.
func (s *URingSQE) setData(ud uint64) {
s.UserData = ud
}

// setFlags sets the flags field of the SQE instance passed in.
func (s *URingSQE) setFlags(flags uint8) {
s.Flags = flags
}

// setAddr sets the flags field of the SQE instance passed in.
func (s *URingSQE) setAddr(addr uintptr) {
s.Addr = uint64(addr)
}

// PrepRW implements SQE
func (s *URingSQE) PrepRW(op OpFlag, fd int32, addr uintptr, len uint32, offset uint64) {
s.OpCode = uint8(op)
s.Flags = 0
s.IOPrio = 0
s.Fd = fd
s.Off = offset
s.setAddr(addr)
s.Len = len
s.OpcodeFlags = 0
s.UserData = 0
s.BufIG = 0
s.Personality = 0
s.SpliceFdIn = 0
s._pad2[0] = 0
s._pad2[1] = 0
}

// Flags of CQE
// IORING_CQE_F_BUFFER If set, the upper 16 bits are the buffer ID
// IORING_CQE_F_MORE If set, parent SQE will generate more CQE entries
// IORING_CQE_F_SOCK_NONEMPTY If set, more data to read after socket recv
const (
IORING_CQE_F_BUFFER OpFlag = 1 << iota
IORING_CQE_F_MORE
IORING_CQE_F_SOCK_NONEMPTY
)

const IORING_CQE_BUFFER_SHIFT = 16

// io_uring_enter(2) flags
const (
IORING_ENTER_GETEVENTS uint32 = 1 << iota
IORING_ENTER_SQ_WAKEUP
IORING_ENTER_SQ_WAIT
IORING_ENTER_EXT_ARG
IORING_ENTER_REGISTERED_RING
)

// If sqe->file_index is set to this for opcodes that instantiate a new
// direct descriptor (like openat/openat2/accept), then io_uring will allocate
// an available direct descriptor instead of having the application pass one
// in. The picked direct descriptor will be returned in cqe->res, or -ENFILE
// if the space is full.
const (
IOSQE_FIXED_FILE_BIT = iota
IOSQE_IO_DRAIN_BIT
IOSQE_IO_LINK_BIT
IOSQE_IO_HARDLINK_BIT
IOSQE_ASYNC_BIT
IOSQE_BUFFER_SELECT_BIT
IOSQE_CQE_SKIP_SUCCESS_BIT
)

// Flags of SQE
const (
// IOSQE_FIXED_FILE means use fixed fileset
IOSQE_FIXED_FILE uint32 = 1 << IOSQE_FIXED_FILE_BIT
// IOSQE_IO_DRAIN means issue after inflight IO
IOSQE_IO_DRAIN uint32 = 1 << IOSQE_IO_DRAIN_BIT
// IOSQE_IO_LINK means links next sqe
IOSQE_IO_LINK uint32 = 1 << IOSQE_IO_LINK_BIT
// IOSQE_IO_HARDLINK means like LINK, but stronger
IOSQE_IO_HARDLINK uint32 = 1 << IOSQE_IO_HARDLINK_BIT
// IOSQE_ASYNC means always go async
IOSQE_ASYNC uint32 = 1 << IOSQE_ASYNC_BIT
// IOSQE_BUFFER_SELECT means select buffer from sqe->buf_group
IOSQE_BUFFER_SELECT uint32 = 1 << IOSQE_BUFFER_SELECT_BIT
// IOSQE_CQE_SKIP_SUCCESS means don't post CQE if request succeeded
IOSQE_CQE_SKIP_SUCCESS uint32 = 1 << IOSQE_CQE_SKIP_SUCCESS_BIT
)
112 changes: 112 additions & 0 deletions uring/sys_mmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package uring

import (
"syscall"
"unsafe"
)

// sysMmap is used to free the URingSQE and URingCQE,
func (u *URing) sysMunmap() (err error) {
err = syscall.Munmap(u.sqRing.buff)
if u.cqRing.buff != nil && &u.cqRing.buff[0] != &u.sqRing.buff[0] {
err = syscall.Munmap(u.cqRing.buff)
}
return
}

// sysMmap is used to configure the URingSQE and URingCQE,
// it should only be called after the sysSetUp function has completed successfully.
func (u *URing) sysMmap(p *ringParams) (err error) {
size := unsafe.Sizeof(URingCQE{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizeof 定义为 global var/const,不用每次都算

if p.flags&IORING_SETUP_CQE32 != 0 {
size += unsafe.Sizeof(URingCQE{})
}
u.sqRing.ringSize = uint64(p.sqOffset.array) + uint64(p.sqEntries*(uint32)(unsafe.Sizeof(uint32(0))))
u.cqRing.ringSize = uint64(p.cqOffset.cqes) + uint64(p.cqEntries*(uint32)(size))

if p.features&IORING_FEAT_SINGLE_MMAP != 0 {
if u.cqRing.ringSize > u.sqRing.ringSize {
u.sqRing.ringSize = u.cqRing.ringSize
}
u.cqRing.ringSize = u.sqRing.ringSize
}

// TODO: syscall.MAP_POPULATE unsupport for macox
data, err := syscall.Mmap(u.fd, 0, int(u.sqRing.ringSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syscall.Mmap/Unmap 封装一下,以后可以修改

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uring 不是 only linux 吗,为什么考虑 macox ? 这里没有 syscall.MAP_POPULATE 不行的

if err != nil {
return err
}
u.sqRing.buff = data

if p.features&IORING_FEAT_SINGLE_MMAP != 0 {
u.cqRing.buff = u.sqRing.buff
} else {
// TODO: syscall.MAP_POPULATE unsupport for macox
data, err = syscall.Mmap(u.fd, int64(IORING_OFF_CQ_RING), int(u.cqRing.ringSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

if err != nil {
u.sysMunmap()
return err
}
u.cqRing.buff = data
}

ringStart := &u.sqRing.buff[0]
u.sqRing.kHead = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.head)))
u.sqRing.kTail = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.tail)))
u.sqRing.kRingMask = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.ringMask)))
u.sqRing.kRingEntries = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.ringEntries)))
u.sqRing.kFlags = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.flags)))
u.sqRing.kDropped = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.dropped)))
u.sqRing.array = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.array)))

size = unsafe.Sizeof(URingSQE{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pin

if p.flags&IORING_SETUP_SQE128 != 0 {
size += 64
}
// TODO: syscall.MAP_POPULATE unsupport for macox
buff, err := syscall.Mmap(u.fd, int64(IORING_OFF_SQES), int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
_ = u.sysMunmap()
return err
}
u.sqRing.sqeBuff = buff

cqRingPtr := uintptr(unsafe.Pointer(&u.cqRing.buff[0]))
ringStart = &u.cqRing.buff[0]

u.cqRing.kHead = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.head)))
u.cqRing.kTail = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.tail)))
u.cqRing.kRingMask = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.ringMsk)))
u.cqRing.kRingEntries = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.ringEntries)))
u.cqRing.kOverflow = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.overflow)))
u.cqRing.cqes = (*URingCQE)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.cqes)))
if p.cqOffset.flags != 0 {
u.cqRing.kFlags = cqRingPtr + uintptr(p.cqOffset.flags)
}

return nil
}

// Magic offsets for the application to mmap the data it needs
const (
// IORING_OFF_SQ_RING maps sqring to program memory space
IORING_OFF_SQ_RING uint64 = 0
// IORING_OFF_CQ_RING maps cqring to program memory space
IORING_OFF_CQ_RING uint64 = 0x8000000
// IORING_OFF_SQES maps sqes array to program memory space
IORING_OFF_SQES uint64 = 0x10000000
)
Loading