Skip to content

Commit

Permalink
fix: close the poll that has already been created when calling the op…
Browse files Browse the repository at this point in the history
…enPoll fails (#256)
  • Loading branch information
wuqinqiang authored May 29, 2023
1 parent cdac6f0 commit 4b09897
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 22 deletions.
11 changes: 6 additions & 5 deletions poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import (
"unsafe"
)

func openPoll() Poll {
func openPoll() (Poll, error) {
return openDefaultPoll()
}

func openDefaultPoll() *defaultPoll {
func openDefaultPoll() (*defaultPoll, error) {
l := new(defaultPoll)
p, err := syscall.Kqueue()
if err != nil {
panic(err)
return nil, err
}
l.fd = p
_, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{
Expand All @@ -41,10 +41,11 @@ func openDefaultPoll() *defaultPoll {
Flags: syscall.EV_ADD | syscall.EV_CLEAR,
}}, nil, nil)
if err != nil {
panic(err)
syscall.Close(l.fd)
return nil, err
}
l.opcache = newOperatorCache()
return l
return l, nil
}

type defaultPoll struct {
Expand Down
25 changes: 16 additions & 9 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,38 @@ import (
"unsafe"
)

func openPoll() Poll {
func openPoll() (Poll, error) {
return openDefaultPoll()
}

func openDefaultPoll() *defaultPoll {
var poll = defaultPoll{}
func openDefaultPoll() (*defaultPoll, error) {
var poll = new(defaultPoll)

poll.buf = make([]byte, 8)
var p, err = EpollCreate(0)
if err != nil {
panic(err)
return nil, err
}
poll.fd = p

var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
if e0 != 0 {
syscall.Close(p)
panic(e0)
_ = syscall.Close(poll.fd)
return nil, e0
}

poll.Reset = poll.reset
poll.Handler = poll.handler

poll.wop = &FDOperator{FD: int(r0)}
poll.Control(poll.wop, PollReadable)

if err = poll.Control(poll.wop, PollReadable); err != nil {
_ = syscall.Close(poll.wop.FD)
_ = syscall.Close(poll.fd)
return nil, err
}

poll.opcache = newOperatorCache()
return &poll
return poll, nil
}

type defaultPoll struct {
Expand Down
15 changes: 13 additions & 2 deletions poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,24 @@ func (m *manager) Close() error {
}

// Run all pollers.
func (m *manager) Run() error {
func (m *manager) Run() (err error) {
defer func() {
if err != nil {
_ = m.Close()
}
}()

// new poll to fill delta.
for idx := len(m.polls); idx < m.NumLoops; idx++ {
var poll = openPoll()
var poll Poll
poll, err = openPoll()
if err != nil {
return
}
m.polls = append(m.polls, poll)
go poll.Wait()
}

// LoadBalance must be set before calling Run, otherwise it will panic.
m.balance.Rebalance(m.polls)
return nil
Expand Down
15 changes: 9 additions & 6 deletions poll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func TestPollTrigger(t *testing.T) {
t.Skip()
var trigger int
var stop = make(chan error)
var p = openDefaultPoll()
var p, err = openDefaultPoll()
MustNil(t, err)

go func() {
stop <- p.Wait()
}()
Expand All @@ -46,7 +48,7 @@ func TestPollTrigger(t *testing.T) {
Equal(t, trigger, 2)

p.Close()
err := <-stop
err = <-stop
MustNil(t, err)
}

Expand All @@ -65,15 +67,15 @@ func TestPollMod(t *testing.T) {
return nil
}
var stop = make(chan error)
var p = openDefaultPoll()
var p, err = openDefaultPoll()
MustNil(t, err)
go func() {
stop <- p.Wait()
}()

var rfd, wfd = GetSysFdPairs()
var rop = &FDOperator{FD: rfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p}
var wop = &FDOperator{FD: wfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p}
var err error
var r, w, h int32
r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn)
Assert(t, r == 0 && w == 0 && h == 0, r, w, h)
Expand Down Expand Up @@ -113,7 +115,8 @@ func TestPollMod(t *testing.T) {
}

func TestPollClose(t *testing.T) {
var p = openDefaultPoll()
var p, err = openDefaultPoll()
MustNil(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -126,7 +129,7 @@ func TestPollClose(t *testing.T) {

func BenchmarkPollMod(b *testing.B) {
b.StopTimer()
var p = openDefaultPoll()
var p, _ = openDefaultPoll()
r, _ := GetSysFdPairs()
var operator = &FDOperator{FD: r}
p.Control(operator, PollReadable)
Expand Down

0 comments on commit 4b09897

Please sign in to comment.