Skip to content

Commit

Permalink
Merge pull request #208 from cloudwego/release/v0.3.0
Browse files Browse the repository at this point in the history
chore: release v0.3.0
  • Loading branch information
Hchenn authored Oct 31, 2022
2 parents d18be17 + 5bf2569 commit 7340a0d
Show file tree
Hide file tree
Showing 58 changed files with 430 additions and 134 deletions.
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CloudWeGO
Copyright 2021 CloudWeGO authors.
Copyright 2022 CloudWeGO authors.

Go
Copyright (c) 2009 The Go Authors.
6 changes: 5 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,10 @@ type Connection interface {
// A zero value for timeout means Reader will not timeout.
SetReadTimeout(timeout time.Duration) error

// SetWriteTimeout sets the timeout for future Write calls wait.
// A zero value for timeout means Writer will not timeout.
SetWriteTimeout(timeout time.Duration) error

// SetIdleTimeout sets the idle timeout of connections.
// Idle connections that exceed the set timeout are no longer guaranteed to be active,
// but can be checked by calling IsActive.
Expand Down
9 changes: 6 additions & 3 deletions connection_errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,16 +23,18 @@ import (
const (
// The connection closed when in use.
ErrConnClosed = syscall.Errno(0x101)
// read I/O buffer timeout, calling by Connection.Reader
// Read I/O buffer timeout, calling by Connection.Reader
ErrReadTimeout = syscall.Errno(0x102)
// dial timeout
// Dial timeout
ErrDialTimeout = syscall.Errno(0x103)
// Calling dialer without timeout.
ErrDialNoDeadline = syscall.Errno(0x104) // TODO: no-deadline support in future
// The calling function not support.
ErrUnsupported = syscall.Errno(0x105)
// Same as io.EOF
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -94,4 +96,5 @@ var errnos = [...]string{
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
}
2 changes: 1 addition & 1 deletion connection_errors_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
83 changes: 79 additions & 4 deletions connection_impl.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,9 @@ type connection struct {
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan struct{}
waitReadSize int32
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
Expand Down Expand Up @@ -83,6 +85,14 @@ func (c *connection) SetReadTimeout(timeout time.Duration) error {
return nil
}

// SetWriteTimeout implements Connection.
func (c *connection) SetWriteTimeout(timeout time.Duration) error {
if timeout >= 0 {
c.writeTimeout = timeout
}
return nil
}

// ------------------------------------------ implement zero-copy reader ------------------------------------------

// Next implements Connection.
Expand Down Expand Up @@ -383,8 +393,8 @@ func (c *connection) waitRead(n int) (err error) {
if n <= c.inputBuffer.Len() {
return nil
}
atomic.StoreInt32(&c.waitReadSize, int32(n))
defer atomic.StoreInt32(&c.waitReadSize, 0)
atomic.StoreInt64(&c.waitReadSize, int64(n))
defer atomic.StoreInt64(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
}
Expand Down Expand Up @@ -474,3 +484,68 @@ func (c *connection) eofError(n int, err error) error {
}
return err
}

// flush write data directly.
func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN {
return Exception(err, "when flush")
}
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// return if write all buffer.
if c.outputBuffer.IsEmpty() {
return nil
}
err = c.operator.Control(PollR2RW)
if err != nil {
return Exception(err, "when flush")
}

return c.waitFlush()
}

func (c *connection) waitFlush() (err error) {
if c.writeTimeout == 0 {
select {
case err = <-c.writeTrigger:
}
return err
}

// set write timeout
if c.writeTimer == nil {
c.writeTimer = time.NewTimer(c.writeTimeout)
} else {
c.writeTimer.Reset(c.writeTimeout)
}

select {
case err = <-c.writeTrigger:
if !c.writeTimer.Stop() { // clean timer
<-c.writeTimer.C
}
return err
case <-c.writeTimer.C:
select {
// try fetch writeTrigger if both cases fires
case err = <-c.writeTrigger:
return err
default:
}
// if timeout, remove write event from poller
// we cannot flush it again, since we don't if the poller is still process outputBuffer
c.operator.Control(PollRW2R)
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}
2 changes: 1 addition & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
5 changes: 3 additions & 2 deletions connection_onevent.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,6 +98,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
c.SetOnConnect(opts.onConnect)
c.SetOnRequest(opts.onRequest)
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)

// calling prepare first and then register.
Expand Down Expand Up @@ -212,7 +213,7 @@ func (c *connection) closeCallback(needLock bool) (err error) {
return nil
}
// If Close is called during OnPrepare, poll is not registered.
if c.closeBy(user) && c.operator.poll != nil {
if c.isCloseBy(user) && c.operator.poll != nil {
c.operator.Control(PollDetach)
}
var latest = c.closeCallbacks.Load()
Expand Down
47 changes: 8 additions & 39 deletions connection_reactor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@ package netpoll

import (
"sync/atomic"
"syscall"
)

// ------------------------------------------ implement FDOperator ------------------------------------------
Expand Down Expand Up @@ -61,13 +60,16 @@ func (c *connection) onClose() error {
func (c *connection) closeBuffer() {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
// if client close the connection, we cannot ensure that the poller is not process the buffer,
// so we need to check the buffer length, and if it's an "unclean" close operation, let's give up to reuse the buffer
if c.inputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.inputBuffer.Close()
barrierPool.Put(c.inputBarrier)
}

c.outputBuffer.Close()
barrierPool.Put(c.outputBarrier)
if c.outputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.outputBuffer.Close()
barrierPool.Put(c.outputBarrier)
}
}

// inputs implements FDOperator.
Expand Down Expand Up @@ -100,7 +102,7 @@ func (c *connection) inputAck(n int) (err error) {
if length == n { // first start onRequest
needTrigger = c.onRequest()
}
if needTrigger && length >= int(atomic.LoadInt32(&c.waitReadSize)) {
if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
c.triggerRead()
}
return nil
Expand Down Expand Up @@ -133,36 +135,3 @@ func (c *connection) rw2r() {
c.operator.Control(PollRW2R)
c.triggerWrite(nil)
}

// flush write data directly.
func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN {
return Exception(err, "when flush")
}
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// return if write all buffer.
if c.outputBuffer.IsEmpty() {
return nil
}
err = c.operator.Control(PollR2RW)
if err != nil {
return Exception(err, "when flush")
}
err = <-c.writeTrigger
if err != nil {
return Exception(err, "when flush")
}
return nil
}
Loading

0 comments on commit 7340a0d

Please sign in to comment.