diff --git a/acceptor_unix.go b/acceptor_unix.go index 8a9d03178..70182b06b 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -25,9 +25,10 @@ package gnet import ( "os" + "golang.org/x/sys/unix" + "github.com/panjf2000/gnet/errors" "github.com/panjf2000/gnet/internal/socket" - "golang.org/x/sys/unix" ) func (svr *server) acceptNewConnection(fd int) error { @@ -47,16 +48,7 @@ func (svr *server) acceptNewConnection(fd int) error { el := svr.lb.next(netAddr) c := newTCPConn(nfd, el, sa, netAddr) - err = el.poller.UrgentTrigger(func(_ []byte) (err error) { - if err = el.poller.AddRead(nfd); err != nil { - _ = unix.Close(nfd) - c.releaseTCP() - return - } - el.connections[nfd] = c - err = el.loopOpen(c) - return - }) + err = el.poller.UrgentTrigger(el.loopInsert, c) if err != nil { _ = unix.Close(nfd) c.releaseTCP() diff --git a/connection_unix.go b/connection_unix.go index c4b9f0e72..9d0e05ac9 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -110,9 +110,6 @@ func (c *conn) read() ([]byte, error) { } func (c *conn) write(buf []byte) (err error) { - if !c.opened { - return - } var outFrame []byte if outFrame, err = c.codec.Encode(c, buf); err != nil { return @@ -142,6 +139,13 @@ func (c *conn) write(buf []byte) (err error) { return } +func (c *conn) asyncWrite(itf interface{}) error { + if !c.opened { + return nil + } + return c.write(itf.([]byte)) +} + func (c *conn) sendTo(buf []byte) error { return unix.Sendto(c.fd, buf, 0, c.sa) } @@ -222,7 +226,7 @@ func (c *conn) BufferLength() int { } func (c *conn) AsyncWrite(buf []byte) error { - return c.loop.poller.Trigger(c.write, buf) + return c.loop.poller.Trigger(c.asyncWrite, buf) } func (c *conn) SendTo(buf []byte) error { @@ -230,15 +234,11 @@ func (c *conn) SendTo(buf []byte) error { } func (c *conn) Wake() error { - return c.loop.poller.UrgentTrigger(func(_ []byte) error { - return c.loop.loopWake(c) - }) + return c.loop.poller.UrgentTrigger(func(_ interface{}) error { return c.loop.loopWake(c) }, nil) } func (c *conn) Close() error { - return c.loop.poller.Trigger(func(_ []byte) error { - return c.loop.loopCloseConn(c, nil) - }, nil) + return c.loop.poller.Trigger(func(_ interface{}) error { return c.loop.loopCloseConn(c, nil) }, nil) } func (c *conn) Context() interface{} { return c.ctx } diff --git a/eventloop_unix.go b/eventloop_unix.go index e1898188c..dc404877d 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -126,6 +126,17 @@ func (el *eventloop) loopAccept(fd int) error { return nil } +func (el *eventloop) loopInsert(itf interface{}) error { + c := itf.(*conn) + if err := el.poller.AddRead(c.fd); err != nil { + _ = unix.Close(c.fd) + c.releaseTCP() + return nil + } + el.connections[c.fd] = c + return el.loopOpen(c) +} + func (el *eventloop) loopOpen(c *conn) error { c.opened = true el.addConn(1) @@ -281,7 +292,7 @@ func (el *eventloop) loopTicker(ctx context.Context) { switch action { case None: case Shutdown: - err := el.poller.UrgentTrigger(func(_ []byte) error { return gerrors.ErrServerShutdown }) + err := el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrServerShutdown }, nil) el.getLogger().Debugf("stopping ticker in event-loop(%d) from Tick(), UrgentTrigger:%v", el.idx, err) } if timer == nil { diff --git a/internal/netpoll/epoll.go b/internal/netpoll/epoll.go index 7da335b7e..0ade7b3d1 100644 --- a/internal/netpoll/epoll.go +++ b/internal/netpoll/epoll.go @@ -91,9 +91,9 @@ var ( // // Note that priorAsyncTaskQueue is a queue with high-priority and its size is expected to be small, // so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) { +func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() - task.Run = f + task.Run, task.Arg = fn, arg p.priorAsyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) { for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) { @@ -106,9 +106,9 @@ func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) { // call this method when the task is not so urgent, for instance writing data back to client. // // Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(f queue.TaskFunc, buf []byte) (err error) { +func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() - task.Run, task.Buf = f, buf + task.Run, task.Arg = fn, arg p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) { for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) { @@ -154,7 +154,7 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error { wakenUp = false task := p.priorAsyncTaskQueue.Dequeue() for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() { - switch err = task.Run(task.Buf); err { + switch err = task.Run(task.Arg); err { case nil: case errors.ErrServerShutdown: return err @@ -167,7 +167,7 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error { if task = p.asyncTaskQueue.Dequeue(); task == nil { break } - switch err = task.Run(task.Buf); err { + switch err = task.Run(task.Arg); err { case nil: case errors.ErrServerShutdown: return err diff --git a/internal/netpoll/kqueue.go b/internal/netpoll/kqueue.go index 7bc44c760..b2f23d09c 100644 --- a/internal/netpoll/kqueue.go +++ b/internal/netpoll/kqueue.go @@ -82,9 +82,9 @@ var wakeChanges = []unix.Kevent_t{{ // // Note that priorAsyncTaskQueue is a queue with high-priority and its size is expected to be small, // so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) { +func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() - task.Run = f + task.Run, task.Arg = fn, arg p.priorAsyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) { for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) { @@ -97,9 +97,9 @@ func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) { // call this method when the task is not so urgent, for instance writing data back to client. // // Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(f queue.TaskFunc, buf []byte) (err error) { +func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() - task.Run, task.Buf = f, buf + task.Run, task.Arg = fn, arg p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) { for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) { @@ -152,7 +152,7 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error { wakenUp = false task := p.priorAsyncTaskQueue.Dequeue() for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() { - switch err = task.Run(task.Buf); err { + switch err = task.Run(task.Arg); err { case nil: case errors.ErrServerShutdown: return err @@ -165,7 +165,7 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error { if task = p.asyncTaskQueue.Dequeue(); task == nil { break } - switch err = task.Run(task.Buf); err { + switch err = task.Run(task.Arg); err { case nil: case errors.ErrServerShutdown: return err diff --git a/internal/queue/queue.go b/internal/queue/queue.go index d9c0a7f62..2c9aa94d4 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -23,12 +23,12 @@ package queue import "sync" // TaskFunc is the callback function executed by poller. -type TaskFunc func([]byte) error +type TaskFunc func(interface{}) error // Task is a wrapper that contains function and its argument. type Task struct { Run TaskFunc - Buf []byte + Arg interface{} } var taskPool = sync.Pool{New: func() interface{} { return new(Task) }} @@ -40,7 +40,7 @@ func GetTask() *Task { // PutTask puts the trashy Task back in pool. func PutTask(task *Task) { - task.Run, task.Buf = nil, nil + task.Run, task.Arg = nil, nil taskPool.Put(task) } diff --git a/server_unix.go b/server_unix.go index 0c0eddde3..6ddc3ff26 100644 --- a/server_unix.go +++ b/server_unix.go @@ -200,9 +200,7 @@ func (svr *server) stop(s Server) { // Notify all loops to close by closing all listeners svr.lb.iterate(func(i int, el *eventloop) bool { - err := el.poller.UrgentTrigger(func(_ []byte) error { - return errors.ErrServerShutdown - }) + err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrServerShutdown }, nil) if err != nil { svr.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping server") } @@ -211,9 +209,7 @@ func (svr *server) stop(s Server) { if svr.mainLoop != nil { svr.ln.close() - err := svr.mainLoop.poller.UrgentTrigger(func(_ []byte) error { - return errors.ErrServerShutdown - }) + err := svr.mainLoop.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrServerShutdown }, nil) if err != nil { svr.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping server") }