Skip to content

Commit

Permalink
refactor: free operator in poller (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 20, 2022
1 parent aedd9f3 commit 88cace1
Show file tree
Hide file tree
Showing 19 changed files with 426 additions and 185 deletions.
36 changes: 21 additions & 15 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,41 @@ name: Push and Pull Request Check
on: [ push, pull_request ]

jobs:
build:
runs-on: ubuntu-latest
compatibility-test:
strategy:
matrix:
go: [ 1.15, 1.19 ]
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2

- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: 1.16

go-version: ${{ matrix.go }}
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
- name: Benchmark
run: go test -bench=. -benchmem -run=none ./...
style-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.16
- name: Check License Header
uses: apache/skywalking-eyes/[email protected]
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Lint
run: |
test -z "$(gofmt -s -l .)"
go vet -stdmethods=false $(go list ./...)
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...

- name: Benchmark
run: go test -bench=. -benchmem -run=none ./...
21 changes: 13 additions & 8 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,26 +349,31 @@ func (c *connection) initNetFD(conn Conn) {
}

func (c *connection) initFDOperator() {
op := allocop()
var op *FDOperator
if c.pd != nil && c.pd.operator != nil {
// reuse operator created at connect step
op = c.pd.operator
} else {
poll := pollmanager.Pick()
op = poll.Alloc()
}
op.FD = c.fd
op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck

// if connection has been registered, must reuse poll here.
if c.pd != nil && c.pd.operator != nil {
op.poll = c.pd.operator.poll
}
c.operator = op
}

func (c *connection) initFinalizer() {
c.AddCloseCallback(func(connection Connection) error {
c.AddCloseCallback(func(connection Connection) (err error) {
c.stop(flushing)
// stop the finalizing state to prevent conn.fill function to be performed
c.stop(finalizing)
freeop(c.operator)
c.netFD.Close()
c.operator.Free()
if err = c.netFD.Close(); err != nil {
logger.Printf("NETPOLL: netFD close failed: %v", err)
}
c.closeBuffer()
return nil
})
Expand Down
16 changes: 10 additions & 6 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ func (c *connection) closeCallback(needLock bool) (err error) {
}
// If Close is called during OnPrepare, poll is not registered.
if c.isCloseBy(user) && c.operator.poll != nil {
c.operator.Control(PollDetach)
if err = c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err)
}
}
var latest = c.closeCallbacks.Load()
if latest == nil {
Expand All @@ -227,14 +229,16 @@ func (c *connection) closeCallback(needLock bool) (err error) {

// register only use for connection register into poll.
func (c *connection) register() (err error) {
if c.operator.poll != nil {
err = c.operator.Control(PollModReadable)
} else {
c.operator.poll = pollmanager.Pick()
if c.operator.isUnused() {
// operator is not registered
err = c.operator.Control(PollReadable)
} else {
// operator is already registered
// change event to wait read new data
err = c.operator.Control(PollModReadable)
}
if err != nil {
logger.Println("connection register failed:", err.Error())
logger.Printf("NETPOLL: connection register failed: %v", err)
c.Close()
return Exception(ErrConnClosed, err.Error())
}
Expand Down
5 changes: 5 additions & 0 deletions fd_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ type FDOperator struct {
// private, used by operatorCache
next *FDOperator
state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
index int32 // index in operatorCache
}

func (op *FDOperator) Control(event PollEvent) error {
return op.poll.Control(op, event)
}

func (op *FDOperator) Free() {
op.poll.Free(op)
}

func (op *FDOperator) do() (can bool) {
return atomic.CompareAndSwapInt32(&op.state, 1, 2)
}
Expand Down
69 changes: 40 additions & 29 deletions fd_operator_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,69 +20,80 @@ import (
"unsafe"
)

func allocop() *FDOperator {
return opcache.alloc()
}

func freeop(op *FDOperator) {
opcache.free(op)
}

func init() {
opcache = &operatorCache{
// cache: make(map[int][]byte),
cache: make([]*FDOperator, 0, 1024),
func newOperatorCache() *operatorCache {
return &operatorCache{
cache: make([]*FDOperator, 0, 1024),
freelist: make([]int32, 0, 1024),
}
runtime.KeepAlive(opcache)
}

var opcache *operatorCache

type operatorCache struct {
locked int32
first *FDOperator
cache []*FDOperator
// freelist store the freeable operator
// to reduce GC pressure, we only store op index here
freelist []int32
freelocked int32
}

func (c *operatorCache) alloc() *FDOperator {
c.lock()
lock(&c.locked)
if c.first == nil {
const opSize = unsafe.Sizeof(FDOperator{})
n := block4k / opSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
index := int32(len(c.cache))
for i := uintptr(0); i < n; i++ {
pd := &FDOperator{}
pd := &FDOperator{index: index}
c.cache = append(c.cache, pd)
pd.next = c.first
c.first = pd
index++
}
}
op := c.first
c.first = op.next
c.unlock()
unlock(&c.locked)
return op
}

func (c *operatorCache) free(op *FDOperator) {
// freeable mark the operator that could be freed
// only poller could do the real free action
func (c *operatorCache) freeable(op *FDOperator) {
// reset all state
op.unused()
op.reset()
lock(&c.freelocked)
c.freelist = append(c.freelist, op.index)
unlock(&c.freelocked)
}

func (c *operatorCache) free() {
lock(&c.freelocked)
defer unlock(&c.freelocked)
if len(c.freelist) == 0 {
return
}

c.lock()
op.next = c.first
c.first = op
c.unlock()
lock(&c.locked)
for _, idx := range c.freelist {
op := c.cache[idx]
op.next = c.first
c.first = op
}
c.freelist = c.freelist[:0]
unlock(&c.locked)
}

func (c *operatorCache) lock() {
for !atomic.CompareAndSwapInt32(&c.locked, 0, 1) {
func lock(locked *int32) {
for !atomic.CompareAndSwapInt32(locked, 0, 1) {
runtime.Gosched()
}
}

func (c *operatorCache) unlock() {
atomic.StoreInt32(&c.locked, 0)
func unlock(locked *int32) {
atomic.StoreInt32(locked, 0)
}
25 changes: 18 additions & 7 deletions fd_operator_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,40 @@ import (

// go test -v -gcflags=-d=checkptr -run=TestPersistFDOperator
func TestPersistFDOperator(t *testing.T) {
opcache := newOperatorCache()
// init
size := 1000
size := 2048
var ops = make([]*FDOperator, size)
for i := 0; i < size; i++ {
op := allocop()
op := opcache.alloc()
op.FD = i
ops[i] = op
}
Equal(t, len(opcache.freelist), 0)
// gc
for i := 0; i < 4; i++ {
runtime.GC()
}
// check alloc
for i := range ops {
Equal(t, ops[i].FD, i)
freeop(ops[i])
opcache.freeable(ops[i])
Equal(t, len(opcache.freelist), i+1)
}
Equal(t, len(opcache.freelist), size)
opcache.free()
Equal(t, len(opcache.freelist), 0)
Assert(t, len(opcache.cache) >= size)
}

func BenchmarkPersistFDOperator1(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
opcache := newOperatorCache()
for i := 0; i < b.N; i++ {
op := allocop()
freeop(op)
op := opcache.alloc()
opcache.freeable(op)
opcache.free()
}
}

Expand All @@ -57,10 +66,12 @@ func BenchmarkPersistFDOperator2(b *testing.B) {
b.ReportAllocs()
b.SetParallelism(128)
b.ResetTimer()
opcache := newOperatorCache()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
op := allocop()
freeop(op)
op := opcache.alloc()
opcache.freeable(op)
opcache.free()
}
})
}
15 changes: 3 additions & 12 deletions net_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,19 @@ func TestListenerDialer(t *testing.T) {
addr := ":1234"
ln, err := CreateListener(network, addr)
MustNil(t, err)
defer time.Sleep(10 * time.Millisecond)
defer ln.Close()

stop := make(chan int)
trigger := make(chan int)
defer close(stop)
defer close(trigger)
msg := []byte("0123456789")

go func() {
for {
select {
case <-stop:
err := ln.Close()
MustNil(t, err)
return
default:
}
conn, err := ln.Accept()
if conn == nil && err == nil {
continue
}
if err != nil {
return
}
go func(conn net.Conn) {
<-trigger
buf := make([]byte, 10)
Expand Down
Loading

0 comments on commit 88cace1

Please sign in to comment.