Skip to content

Commit

Permalink
fix issue: #17
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Jun 11, 2021
1 parent c78c884 commit afe3e93
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
2 changes: 1 addition & 1 deletion aio_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (p *poller) Watch(fd int) error {
return p.wakeup()
}

func (p *poller) Rearm(fd int) error {
func (p *poller) Rearm(fd int, read bool, write bool) (err error) {
return nil
}

Expand Down
13 changes: 11 additions & 2 deletions aio_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,18 @@ func (p *poller) Watch(fd int) (err error) {
return
}

func (p *poller) Rearm(fd int) (err error) {
func (p *poller) Rearm(fd int, read bool, write bool) (err error) {
p.mu.Lock()
err = syscall.EpollCtl(p.pfd, syscall.EPOLL_CTL_MOD, int(fd), &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLONESHOT | syscall.EPOLLRDHUP | syscall.EPOLLIN | syscall.EPOLLOUT | _EPOLLET})
var flag uint32
flag = syscall.EPOLLONESHOT | _EPOLLET
if read {
flag |= syscall.EPOLLIN | syscall.EPOLLRDHUP
}
if write {
flag |= syscall.EPOLLOUT
}

err = syscall.EpollCtl(p.pfd, syscall.EPOLL_CTL_MOD, int(fd), &syscall.EpollEvent{Fd: int32(fd), Events: flag})
p.mu.Unlock()
return
}
Expand Down
36 changes: 25 additions & 11 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type fdDesc struct {
readers list.List // all read/write requests
writers list.List
ptr uintptr // pointer to net.Conn
armed bool
r_armed bool
w_armed bool
}

// watcher will monitor events and process async-io request(s),
Expand Down Expand Up @@ -540,6 +541,12 @@ func (w *watcher) handlePending(pending []*aiocb) {
// enqueue for poller events
pcb.l = &desc.readers
pcb.elem = pcb.l.PushBack(pcb)

// try rearm descriptor
if !desc.r_armed {
w.pfd.Rearm(ident, true, false)
desc.r_armed = true
}
case OpWrite:
if desc.writers.Len() == 0 {
if w.tryWrite(ident, pcb) {
Expand All @@ -550,12 +557,12 @@ func (w *watcher) handlePending(pending []*aiocb) {

pcb.l = &desc.writers
pcb.elem = pcb.l.PushBack(pcb)
}

// try rearm descriptor
if !desc.armed {
w.pfd.Rearm(ident)
desc.armed = true
// try rearm descriptor
if !desc.w_armed {
w.pfd.Rearm(ident, false, true)
desc.w_armed = true
}
}

// push to heap for timeout operation
Expand All @@ -581,8 +588,8 @@ func (w *watcher) handleEvents(pe pollerEvents) {
//log.Println(e)
for _, e := range pe {
if desc, ok := w.descs[e.ident]; ok {
desc.armed = false
if e.ev&EV_READ != 0 {
desc.r_armed = false
var next *list.Element
for elem := desc.readers.Front(); elem != nil; elem = next {
next = elem.Next()
Expand All @@ -594,9 +601,16 @@ func (w *watcher) handleEvents(pe pollerEvents) {
break
}
}

if desc.readers.Len() > 0 {
w.pfd.Rearm(e.ident, true, false)
desc.r_armed = true
}

}

if e.ev&EV_WRITE != 0 {
desc.w_armed = false
var next *list.Element
for elem := desc.writers.Front(); elem != nil; elem = next {
next = elem.Next()
Expand All @@ -608,11 +622,11 @@ func (w *watcher) handleEvents(pe pollerEvents) {
break
}
}
}

if desc.readers.Len() > 0 || desc.writers.Len() > 0 {
w.pfd.Rearm(e.ident)
desc.armed = true
if desc.writers.Len() > 0 {
w.pfd.Rearm(e.ident, false, true)
desc.w_armed = true
}
}
}
}
Expand Down

0 comments on commit afe3e93

Please sign in to comment.