Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: release v0.6.3 #352

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@ jobs:
compatibility-test:
strategy:
matrix:
go: [ 1.15, "1.21" ]
os: [ X64, ARM64 ]
go: [ 1.15, 1.22 ]
# - "ubuntu-latest" is for Linux with X64 CPU, hosted by GitHub,
# fewer CPUs but high speed international network
# - "ARM64" is for Linux with ARM64 CPU, hosted by bytedance,
# more CPUs but inside CN internet which may download go cache slowly.
# GitHub don't have free runner with ARM CPU.
os: [ ubuntu-latest, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
run: go test -timeout=2m -v -race -covermode=atomic -coverprofile=coverage.out ./...
- name: Benchmark
run: go test -bench=. -benchmem -run=none ./...
windows-test:
Expand All @@ -32,13 +31,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.20"
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
go-version: 1.22
- name: Build Test
run: go vet -v ./...
style-test:
Expand Down
4 changes: 2 additions & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.bookSize, c.maxSize = defaultLinkBufferSize, defaultLinkBufferSize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(defaultLinkBufferSize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier)
c.state = 0

Expand Down
124 changes: 67 additions & 57 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"os"
"runtime"
Expand Down Expand Up @@ -102,11 +101,13 @@ func TestConnectionLargeWrite(t *testing.T) {
func TestConnectionRead(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)
err := rconn.init(&netFD{fd: r}, nil)
MustNil(t, err)
err = wconn.init(&netFD{fd: w}, nil)
MustNil(t, err)

var size = 256
var cycleTime = 100000
var cycleTime = 1000
var msg = make([]byte, size)
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -129,6 +130,13 @@ func TestConnectionRead(t *testing.T) {
}

func TestConnectionNoCopyReadString(t *testing.T) {
err := Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}})
MustNil(t, err)
defer func() {
err = Configure(Config{Feature: Feature{AlwaysNoCopyRead: false}})
MustNil(t, err)
}()

r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
Expand Down Expand Up @@ -382,34 +390,28 @@ func TestConnectionLargeMemory(t *testing.T) {
rconn.init(&netFD{fd: r}, nil)

var wg sync.WaitGroup
defer wg.Wait()

var rn, wn = 1024, 1 * 1024 * 1024

wg.Add(1)
go func() {
defer wg.Done()
rconn.Reader().Next(wn)
_, err := rconn.Reader().Next(wn)
MustNil(t, err)
}()

var msg = make([]byte, rn)
for i := 0; i < wn/rn; i++ {
n, err := syscall.Write(w, msg)
if err != nil {
panic(err)
}
if n != rn {
panic(fmt.Sprintf("n[%d]!=rn[%d]", n, rn))
MustNil(t, err)
}
time.Sleep(time.Millisecond)
Equal(t, n, rn)
}

runtime.ReadMemStats(&end)
alloc := end.TotalAlloc - start.TotalAlloc
limit := uint64(4 * 1024 * 1024)
if alloc > limit {
panic(fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit))
}
Assert(t, alloc <= limit, fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit))
}

// TestSetTCPNoDelay is used to verify the connection initialization set the TCP_NODELAY correctly
Expand All @@ -431,7 +433,7 @@ func TestConnectionUntil(t *testing.T) {
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)
loopSize := 100000
loopSize := 10000

msg := make([]byte, 1002)
msg[500], msg[1001] = '\n', '\n'
Expand Down Expand Up @@ -459,44 +461,57 @@ func TestConnectionUntil(t *testing.T) {

func TestBookSizeLargerThanMaxSize(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)
rconn, wconn := &connection{}, &connection{}
err := rconn.init(&netFD{fd: r}, nil)
MustNil(t, err)
err = wconn.init(&netFD{fd: w}, nil)
MustNil(t, err)

var length = 25
dataCollection := make([][]byte, length)
for i := 0; i < length; i++ {
dataCollection[i] = make([]byte, 2<<i)
for j := 0; j < 2<<i; j++ {
dataCollection[i][j] = byte(rand.Intn(256))
// prepare data
maxSize := 1024 * 1024 * 128
origin := make([][]byte, 0)
for size := maxSize; size > 0; size = size >> 1 {
ch := 'a' + byte(size%26)
origin = append(origin, make([]byte, size))
for i := 0; i < size; i++ {
origin[len(origin)-1][i] = ch
}
}

// read
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < length; i++ {
buf, err := rconn.Reader().Next(2 << i)
idx := 0
for size := maxSize; size > 0; size = size >> 1 {
buf, err := rconn.Reader().Next(size)
MustNil(t, err)
Equal(t, string(buf), string(dataCollection[i]))
rconn.Reader().Release()
Equal(t, string(buf), string(origin[idx]))
err = rconn.Reader().Release()
MustNil(t, err)
idx++
}
}()
for i := 0; i < length; i++ {
n, err := wconn.Write(dataCollection[i])

// write
for i := 0; i < len(origin); i++ {
n, err := wconn.Write(origin[i])
MustNil(t, err)
Equal(t, n, 2<<i)
Equal(t, n, len(origin[i]))
}
wg.Wait()
rconn.Close()
wconn.Close()
}

func TestConnDetach(t *testing.T) {
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

// accept => read => write
var wg sync.WaitGroup
go func() {
for {
conn, err := ln.Accept()
Expand All @@ -506,35 +521,33 @@ func TestConnDetach(t *testing.T) {
if conn == nil {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
buf := make([]byte, 1024)
// slow read
for {
_, err := conn.Read(buf)
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
_, err = conn.Write(buf)
if err != nil {
return
}
_, err := conn.Read(buf)
if err != nil {
return
}
time.Sleep(10 * time.Millisecond)
_, err = conn.Write(buf)
if err != nil {
return
}
}()
}
}()

// dial => detach => write => read
c, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)

conn := c.(*TCPConnection)

err = conn.Detach()
MustNil(t, err)

f := os.NewFile(uintptr(conn.fd), "netpoll-connection")
defer f.Close()

gonetconn, err := net.FileConn(f)
MustNil(t, err)
buf := make([]byte, 1024)
Expand All @@ -545,13 +558,14 @@ func TestConnDetach(t *testing.T) {

err = gonetconn.Close()
MustNil(t, err)

err = ln.Close()
MustNil(t, err)
err = c.Close()
MustNil(t, err)
wg.Wait()
}

func TestParallelShortConnection(t *testing.T) {
t.Skip("TODO: it's not stable now, need fix CI")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
Expand Down Expand Up @@ -592,11 +606,8 @@ func TestParallelShortConnection(t *testing.T) {
}
wg.Wait()

count := 100
for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) {
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
time.Sleep(time.Millisecond * 100)
count--
for atomic.LoadInt64(&received) < int64(totalSize) {
runtime.Gosched()
}
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
}
Expand All @@ -619,7 +630,7 @@ func TestConnectionServerClose(t *testing.T) {
var wg sync.WaitGroup
el, err := NewEventLoop(
func(ctx context.Context, connection Connection) error {
t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
//t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
defer wg.Done()
buf, err := connection.Reader().Next(len(PONG)) // pong
Equal(t, string(buf), PONG)
Expand All @@ -642,14 +653,14 @@ func TestConnectionServerClose(t *testing.T) {
err = connection.Writer().Flush()
MustNil(t, err)
connection.AddCloseCallback(func(connection Connection) error {
t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
//t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
wg.Done()
return nil
})
return ctx
}),
WithOnPrepare(func(connection Connection) context.Context {
t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
//t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
defer wg.Done()
return context.WithValue(context.Background(), "prepare", "true")
}),
Expand Down Expand Up @@ -690,13 +701,12 @@ func TestConnectionServerClose(t *testing.T) {
err = conn.SetOnRequest(clientOnRequest)
MustNil(t, err)
conn.AddCloseCallback(func(connection Connection) error {
t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
//t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
defer wg.Done()
return nil
})
}()
}
//time.Sleep(time.Second)
wg.Wait()
}

Expand Down
Loading
Loading