Skip to content

Commit

Permalink
use map instead of sync.map (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allenxuxu authored Mar 14, 2021
1 parent fc07a60 commit 3519ef9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 19 deletions.
27 changes: 12 additions & 15 deletions eventloop/eventloop.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package eventloop

import (
"sync"
"unsafe"

"github.com/Allenxuxu/gev/log"
Expand Down Expand Up @@ -30,7 +29,7 @@ type eventLoopLocal struct {
eventHandling atomic.Bool
poll *poller.Poller
mu spinlock.SpinLock
sockets *sync.Map
sockets map[int]Socket
packet []byte
pendingFunc []func()
}
Expand All @@ -46,7 +45,7 @@ func New() (*EventLoop, error) {
eventLoopLocal: eventLoopLocal{
poll: p,
packet: make([]byte, 0xFFFF),
sockets: &sync.Map{},
sockets: make(map[int]Socket),
},
}, nil
}
Expand All @@ -61,16 +60,16 @@ func (l *EventLoop) DeleteFdInLoop(fd int) {
if err := l.poll.Del(fd); err != nil {
log.Error("[DeleteFdInLoop]", err)
}
l.sockets.Delete(fd)
delete(l.sockets, fd)
}

// AddSocketAndEnableRead 增加 Socket 到时间循环中,并注册可读事件
func (l *EventLoop) AddSocketAndEnableRead(fd int, s Socket) error {
var err error
l.sockets.Store(fd, s)
l.sockets[fd] = s

if err = l.poll.AddRead(fd); err != nil {
l.sockets.Delete(fd)
delete(l.sockets, fd)
return err
}
return nil
Expand All @@ -93,17 +92,15 @@ func (l *EventLoop) RunLoop() {

// Stop 关闭事件循环
func (l *EventLoop) Stop() error {
l.sockets.Range(func(key, value interface{}) bool {
s, ok := value.(Socket)
if !ok {
log.Error("value.(Socket) fail")
} else {
if err := s.Close(); err != nil {
l.QueueInLoop(func() {
for _, v := range l.sockets {
if err := v.Close(); err != nil {
log.Error(err)
}
}
return true
l.sockets = nil
})

return l.poll.Close()
}

Expand All @@ -124,9 +121,9 @@ func (l *EventLoop) handlerEvent(fd int, events poller.Event) {
l.eventHandling.Set(true)

if fd != -1 {
s, ok := l.sockets.Load(fd)
s, ok := l.sockets[fd]
if ok {
s.(Socket).HandleEvent(fd, events)
s.HandleEvent(fd, events)
}
}

Expand Down
10 changes: 6 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,12 @@ func (s *Server) handleNewConnection(fd int, sa unix.Sockaddr) {

c := connection.New(fd, loop, sa, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)

s.callback.OnConnect(c)
if err := loop.AddSocketAndEnableRead(fd, c); err != nil {
log.Error("[AddSocketAndEnableRead]", err)
}
loop.QueueInLoop(func() {
s.callback.OnConnect(c)
if err := loop.AddSocketAndEnableRead(fd, c); err != nil {
log.Error("[AddSocketAndEnableRead]", err)
}
})
}

// Start 启动 Server
Expand Down

0 comments on commit 3519ef9

Please sign in to comment.