diff --git a/eventloop/eventloop.go b/eventloop/eventloop.go index c8d0bc8..dfafd72 100644 --- a/eventloop/eventloop.go +++ b/eventloop/eventloop.go @@ -1,7 +1,6 @@ package eventloop import ( - "sync" "unsafe" "github.com/Allenxuxu/gev/log" @@ -30,7 +29,7 @@ type eventLoopLocal struct { eventHandling atomic.Bool poll *poller.Poller mu spinlock.SpinLock - sockets *sync.Map + sockets map[int]Socket packet []byte pendingFunc []func() } @@ -46,7 +45,7 @@ func New() (*EventLoop, error) { eventLoopLocal: eventLoopLocal{ poll: p, packet: make([]byte, 0xFFFF), - sockets: &sync.Map{}, + sockets: make(map[int]Socket), }, }, nil } @@ -61,16 +60,16 @@ func (l *EventLoop) DeleteFdInLoop(fd int) { if err := l.poll.Del(fd); err != nil { log.Error("[DeleteFdInLoop]", err) } - l.sockets.Delete(fd) + delete(l.sockets, fd) } // AddSocketAndEnableRead 增加 Socket 到时间循环中,并注册可读事件 func (l *EventLoop) AddSocketAndEnableRead(fd int, s Socket) error { var err error - l.sockets.Store(fd, s) + l.sockets[fd] = s if err = l.poll.AddRead(fd); err != nil { - l.sockets.Delete(fd) + delete(l.sockets, fd) return err } return nil @@ -93,17 +92,15 @@ func (l *EventLoop) RunLoop() { // Stop 关闭事件循环 func (l *EventLoop) Stop() error { - l.sockets.Range(func(key, value interface{}) bool { - s, ok := value.(Socket) - if !ok { - log.Error("value.(Socket) fail") - } else { - if err := s.Close(); err != nil { + l.QueueInLoop(func() { + for _, v := range l.sockets { + if err := v.Close(); err != nil { log.Error(err) } } - return true + l.sockets = nil }) + return l.poll.Close() } @@ -124,9 +121,9 @@ func (l *EventLoop) handlerEvent(fd int, events poller.Event) { l.eventHandling.Set(true) if fd != -1 { - s, ok := l.sockets.Load(fd) + s, ok := l.sockets[fd] if ok { - s.(Socket).HandleEvent(fd, events) + s.HandleEvent(fd, events) } } diff --git a/server.go b/server.go index b3bfb45..ec3f4ea 100644 --- a/server.go +++ b/server.go @@ -99,10 +99,12 @@ func (s *Server) handleNewConnection(fd int, sa unix.Sockaddr) { c := connection.New(fd, loop, sa, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback) - s.callback.OnConnect(c) - if err := loop.AddSocketAndEnableRead(fd, c); err != nil { - log.Error("[AddSocketAndEnableRead]", err) - } + loop.QueueInLoop(func() { + s.callback.OnConnect(c) + if err := loop.AddSocketAndEnableRead(fd, c); err != nil { + log.Error("[AddSocketAndEnableRead]", err) + } + }) } // Start 启动 Server