Skip to content

Commit

Permalink
feat: connection flush support write timeout (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored Oct 31, 2022
1 parent 80e9c59 commit 5bf2569
Show file tree
Hide file tree
Showing 58 changed files with 244 additions and 102 deletions.
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CloudWeGO
Copyright 2021 CloudWeGO authors.
Copyright 2022 CloudWeGO authors.

Go
Copyright (c) 2009 The Go Authors.
6 changes: 5 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,10 @@ type Connection interface {
// A zero value for timeout means Reader will not timeout.
SetReadTimeout(timeout time.Duration) error

// SetWriteTimeout sets the timeout for future Write calls wait.
// A zero value for timeout means Writer will not timeout.
SetWriteTimeout(timeout time.Duration) error

// SetIdleTimeout sets the idle timeout of connections.
// Idle connections that exceed the set timeout are no longer guaranteed to be active,
// but can be checked by calling IsActive.
Expand Down
9 changes: 6 additions & 3 deletions connection_errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,16 +23,18 @@ import (
const (
// The connection closed when in use.
ErrConnClosed = syscall.Errno(0x101)
// read I/O buffer timeout, calling by Connection.Reader
// Read I/O buffer timeout, calling by Connection.Reader
ErrReadTimeout = syscall.Errno(0x102)
// dial timeout
// Dial timeout
ErrDialTimeout = syscall.Errno(0x103)
// Calling dialer without timeout.
ErrDialNoDeadline = syscall.Errno(0x104) // TODO: no-deadline support in future
// The calling function not support.
ErrUnsupported = syscall.Errno(0x105)
// Same as io.EOF
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -94,4 +96,5 @@ var errnos = [...]string{
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
}
2 changes: 1 addition & 1 deletion connection_errors_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
77 changes: 76 additions & 1 deletion connection_impl.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,8 @@ type connection struct {
readTimer *time.Timer
readTrigger chan struct{}
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
Expand Down Expand Up @@ -83,6 +85,14 @@ func (c *connection) SetReadTimeout(timeout time.Duration) error {
return nil
}

// SetWriteTimeout implements Connection.
func (c *connection) SetWriteTimeout(timeout time.Duration) error {
if timeout >= 0 {
c.writeTimeout = timeout
}
return nil
}

// ------------------------------------------ implement zero-copy reader ------------------------------------------

// Next implements Connection.
Expand Down Expand Up @@ -474,3 +484,68 @@ func (c *connection) eofError(n int, err error) error {
}
return err
}

// flush write data directly.
func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN {
return Exception(err, "when flush")
}
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// return if write all buffer.
if c.outputBuffer.IsEmpty() {
return nil
}
err = c.operator.Control(PollR2RW)
if err != nil {
return Exception(err, "when flush")
}

return c.waitFlush()
}

func (c *connection) waitFlush() (err error) {
if c.writeTimeout == 0 {
select {
case err = <-c.writeTrigger:
}
return err
}

// set write timeout
if c.writeTimer == nil {
c.writeTimer = time.NewTimer(c.writeTimeout)
} else {
c.writeTimer.Reset(c.writeTimeout)
}

select {
case err = <-c.writeTrigger:
if !c.writeTimer.Stop() { // clean timer
<-c.writeTimer.C
}
return err
case <-c.writeTimer.C:
select {
// try fetch writeTrigger if both cases fires
case err = <-c.writeTrigger:
return err
default:
}
// if timeout, remove write event from poller
// we cannot flush it again, since we don't if the poller is still process outputBuffer
c.operator.Control(PollRW2R)
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}
2 changes: 1 addition & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
3 changes: 2 additions & 1 deletion connection_onevent.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,6 +98,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
c.SetOnConnect(opts.onConnect)
c.SetOnRequest(opts.onRequest)
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)

// calling prepare first and then register.
Expand Down
45 changes: 7 additions & 38 deletions connection_reactor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@ package netpoll

import (
"sync/atomic"
"syscall"
)

// ------------------------------------------ implement FDOperator ------------------------------------------
Expand Down Expand Up @@ -61,13 +60,16 @@ func (c *connection) onClose() error {
func (c *connection) closeBuffer() {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
// if client close the connection, we cannot ensure that the poller is not process the buffer,
// so we need to check the buffer length, and if it's an "unclean" close operation, let's give up to reuse the buffer
if c.inputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.inputBuffer.Close()
barrierPool.Put(c.inputBarrier)
}

c.outputBuffer.Close()
barrierPool.Put(c.outputBarrier)
if c.outputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.outputBuffer.Close()
barrierPool.Put(c.outputBarrier)
}
}

// inputs implements FDOperator.
Expand Down Expand Up @@ -133,36 +135,3 @@ func (c *connection) rw2r() {
c.operator.Control(PollRW2R)
c.triggerWrite(nil)
}

// flush write data directly.
func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN {
return Exception(err, "when flush")
}
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// return if write all buffer.
if c.outputBuffer.IsEmpty() {
return nil
}
err = c.operator.Control(PollR2RW)
if err != nil {
return Exception(err, "when flush")
}
err = <-c.writeTrigger
if err != nil {
return Exception(err, "when flush")
}
return nil
}
54 changes: 53 additions & 1 deletion connection_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -260,6 +260,58 @@ func TestLargeBufferWrite(t *testing.T) {
wg.Wait()
}

func TestWriteTimeout(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
MustNil(t, err)

interval := time.Millisecond * 100
go func() {
for {
conn, err := ln.Accept()
if conn == nil && err == nil {
continue
}
if err != nil {
return
}
go func() {
buf := make([]byte, 1024)
// slow read
for {
_, err := conn.Read(buf)
if err != nil {
err = conn.Close()
MustNil(t, err)
return
}
time.Sleep(interval)
}
}()
}
}()

conn, err := DialConnection("tcp", ":1234", time.Second)
MustNil(t, err)

_, err = conn.Writer().Malloc(1024)
MustNil(t, err)
err = conn.Writer().Flush()
MustNil(t, err)

_ = conn.SetWriteTimeout(time.Millisecond * 10)
_, err = conn.Writer().Malloc(1024 * 1024 * 512)
MustNil(t, err)
err = conn.Writer().Flush()
MustTrue(t, errors.Is(err, ErrWriteTimeout))

// close success
err = conn.Close()
MustNil(t, err)

err = ln.Close()
MustNil(t, err)
}

// TestConnectionLargeMemory is used to verify the memory usage in the large package scenario.
func TestConnectionLargeMemory(t *testing.T) {
var start, end runtime.MemStats
Expand Down
2 changes: 1 addition & 1 deletion fd_operator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion fd_operator_cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion fd_operator_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion mux/mux_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion mux/shard_queue.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion mux/shard_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion net_dialer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion net_dialer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion net_listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion net_listener_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion net_netfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// license that can be found in the LICENSE file.
//
// This file may have been modified by CloudWeGo authors. (“CloudWeGo Modifications”).
// All CloudWeGo Modifications are Copyright 2021 CloudWeGo authors.
// All CloudWeGo Modifications are Copyright 2022 CloudWeGo authors.

//go:build aix || darwin || dragonfly || freebsd || linux || nacl || netbsd || openbsd || solaris
// +build aix darwin dragonfly freebsd linux nacl netbsd openbsd solaris
Expand Down
2 changes: 1 addition & 1 deletion net_netfd_conn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion net_polldesc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 5bf2569

Please sign in to comment.