diff --git a/connection_impl.go b/connection_impl.go index 1fa1a8e4..eb99f1c1 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -43,7 +43,6 @@ type connection struct { writeTrigger chan error inputBuffer *LinkBuffer outputBuffer *LinkBuffer - inputBarrier *barrier outputBarrier *barrier supportZeroCopy bool maxSize int // The maximum size of data between two Release(). @@ -323,7 +322,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) { c.writeTrigger = make(chan error, 1) c.bookSize, c.maxSize = pagesize, pagesize c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer() - c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier) + c.outputBarrier = barrierPool.Get().(*barrier) c.initNetFD(conn) // conn must be *netFD{} c.initFDOperator() diff --git a/connection_reactor.go b/connection_reactor.go index fa485be1..cd5d717c 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -71,7 +71,6 @@ func (c *connection) closeBuffer() { // so we need to check the buffer length, and if it's an "unclean" close operation, let's give up to reuse the buffer if c.inputBuffer.Len() == 0 || onConnect != nil || onRequest != nil { c.inputBuffer.Close() - barrierPool.Put(c.inputBarrier) } if c.outputBuffer.Len() == 0 || onConnect != nil || onRequest != nil { c.outputBuffer.Close() diff --git a/connection_test.go b/connection_test.go index 6de6f017..782e85c2 100644 --- a/connection_test.go +++ b/connection_test.go @@ -25,6 +25,7 @@ import ( "net" "os" "runtime" + "strings" "sync" "sync/atomic" "syscall" @@ -638,3 +639,39 @@ func TestConnectionServerClose(t *testing.T) { //time.Sleep(time.Second) wg.Wait() } + +func TestConnectionDailTimeoutAndClose(t *testing.T) { + ln, err := createTestListener("tcp", ":12345") + MustNil(t, err) + defer ln.Close() + + el, err := NewEventLoop( + func(ctx context.Context, connection Connection) error { + _, err = connection.Reader().Next(connection.Reader().Len()) + return err + }, + ) + defer el.Shutdown(context.Background()) + go func() { + err := el.Serve(ln) + if err != nil { + t.Logf("servce end with error: %v", err) + } + }() + + loops := 100 + conns := 100 + for l := 0; l < loops; l++ { + var wg sync.WaitGroup + wg.Add(conns) + for i := 0; i < conns; i++ { + go func() { + defer wg.Done() + conn, err := DialConnection("tcp", ":12345", time.Nanosecond) + Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout")) + _ = conn + }() + } + wg.Wait() + } +} diff --git a/net_polldesc.go b/net_polldesc.go index dfd95de1..c197850e 100644 --- a/net_polldesc.go +++ b/net_polldesc.go @@ -53,14 +53,17 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) { } select { - case <-pd.closeTrigger: + case <-pd.closeTrigger: // triggered by poller // no need to detach, since poller has done it in OnHup. return Exception(ErrConnClosed, "by peer") - case <-pd.writeTrigger: + case <-pd.writeTrigger: // triggered by poller err = nil - case <-ctx.Done(): + case <-ctx.Done(): // triggered by ctx // deregister from poller, upper caller function will close fd + // detach first but there's a very small possibility that operator is doing in poller, + // so need call unused() to wait operator done pd.detach() + pd.operator.unused() err = mapErr(ctx.Err()) } // double check close trigger