Skip to content

Commit

Permalink
Merge pull request #185 from cloudwego/release/v0.2.6
Browse files Browse the repository at this point in the history
chore: release v0.2.6
  • Loading branch information
Hchenn authored Aug 3, 2022
2 parents 93eca42 + 65566e5 commit d18be17
Show file tree
Hide file tree
Showing 37 changed files with 296 additions and 119 deletions.
25 changes: 25 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,28 @@ type Connection interface {
// to polling check connection status.
AddCloseCallback(callback CloseCallback) error
}

// Conn extends net.Conn, but supports getting the conn's fd.
type Conn interface {
net.Conn

// Fd return conn's fd, used by poll
Fd() (fd int)
}

// Listener extends net.Listener, but supports getting the listener's fd.
type Listener interface {
net.Listener

// Fd return listener's fd, used by poll.
Fd() (fd int)
}

// Dialer extends net.Dialer's API, just for interface compatibility.
// DialConnection is recommended, but of course all functions are practically the same.
// The returned net.Conn can be directly asserted as Connection if error is nil.
type Dialer interface {
DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)

DialTimeout(network, address string, timeout time.Duration) (conn net.Conn, err error)
}
3 changes: 3 additions & 0 deletions connection_errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down
3 changes: 3 additions & 0 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down
3 changes: 3 additions & 0 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down
11 changes: 9 additions & 2 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down Expand Up @@ -56,8 +59,12 @@ func (c *connection) onClose() error {

// closeBuffer recycle input & output LinkBuffer.
func (c *connection) closeBuffer() {
c.inputBuffer.Close()
barrierPool.Put(c.inputBarrier)
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if c.inputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.inputBuffer.Close()
barrierPool.Put(c.inputBarrier)
}

c.outputBuffer.Close()
barrierPool.Put(c.outputBarrier)
Expand Down
3 changes: 3 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down
94 changes: 94 additions & 0 deletions eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package netpoll

import (
"context"
"net"
)

// A EventLoop is a network server.
type EventLoop interface {
// Serve registers a listener and runs blockingly to provide services, including listening to ports,
// accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,
// Serve will return an error which describes the specific reason.
Serve(ln net.Listener) error

// Shutdown is used to graceful exit.
// It will close all idle connections on the server, but will not change the underlying pollers.
//
// Argument: ctx set the waiting deadline, after which an error will be returned,
// but will not force the closing of connections in progress.
Shutdown(ctx context.Context) error
}

// OnPrepare is used to inject custom preparation at connection initialization,
// which is optional but important in some scenarios. For example, a qps limiter
// can be set by closing overloaded connections directly in OnPrepare.
//
// Return:
// context will become the argument of OnRequest.
// Usually, custom resources can be initialized in OnPrepare and used in OnRequest.
//
// PLEASE NOTE:
// OnPrepare is executed without any data in the connection,
// so Reader() or Writer() cannot be used here, but may be supported in the future.
type OnPrepare func(connection Connection) context.Context

// OnConnect is called once connection created.
// It supports read/write/close connection, and could return a ctx which will be passed to OnRequest.
// OnConnect will not block the poller since it's executed asynchronously.
// Only after OnConnect finished the OnRequest could be executed.
//
// An example usage in TCP Proxy scenario:
//
// func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context {
// downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second)
// return context.WithValue(ctx, downstreamKey, downstream)
// }
// func onRequest(ctx context.Context, upstream netpoll.Connection) error {
// downstream := ctx.Value(downstreamKey).(netpoll.Connection)
// }
type OnConnect func(ctx context.Context, connection Connection) context.Context

// OnRequest defines the function for handling connection. When data is sent from the connection peer,
// netpoll actively reads the data in LT mode and places it in the connection's input buffer.
// Generally, OnRequest starts handling the data in the following way:
//
// func OnRequest(ctx context, connection Connection) error {
// input := connection.Reader().Next(n)
// handling input data...
// send, _ := connection.Writer().Malloc(l)
// copy(send, output)
// connection.Flush()
// return nil
// }
//
// OnRequest will run in a separate goroutine and
// it is guaranteed that there is one and only one OnRequest running at the same time.
// The underlying logic is similar to:
//
// go func() {
// for !connection.Reader().IsEmpty() {
// OnRequest(ctx, connection)
// }
// }()
//
// PLEASE NOTE:
// OnRequest must either eventually read all the input data or actively Close the connection,
// otherwise the goroutine will fall into a dead loop.
//
// Return: error is unused which will be ignored directly.
type OnRequest func(ctx context.Context, connection Connection) error
3 changes: 3 additions & 0 deletions fd_operator_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down
3 changes: 3 additions & 0 deletions mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package mux

import (
Expand Down
3 changes: 3 additions & 0 deletions mux/shard_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package mux

import (
Expand Down
12 changes: 3 additions & 9 deletions net_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand All @@ -20,15 +23,6 @@ import (
"time"
)

// Dialer extends net.Dialer's API, just for interface compatibility.
// DialConnection is recommended, but of course all functions are practically the same.
// The returned net.Conn can be directly asserted as Connection if error is nil.
type Dialer interface {
DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)

DialTimeout(network, address string, timeout time.Duration) (conn net.Conn, err error)
}

// DialConnection is a default implementation of Dialer.
func DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) {
return defaultDialer.DialConnection(network, address, timeout)
Expand Down
3 changes: 3 additions & 0 deletions net_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down
9 changes: 1 addition & 8 deletions net_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux
// +build darwin netbsd freebsd openbsd dragonfly linux

package netpoll
Expand All @@ -23,14 +24,6 @@ import (
"syscall"
)

// Listener extends net.Listener, but supports getting the listener's fd.
type Listener interface {
net.Listener

// Fd return listener's fd, used by poll.
Fd() (fd int)
}

// CreateListener return a new Listener.
func CreateListener(network, addr string) (l Listener, err error) {
if network == "udp" {
Expand Down
3 changes: 3 additions & 0 deletions net_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux
// +build darwin netbsd freebsd openbsd dragonfly linux

package netpoll

import (
Expand Down
9 changes: 1 addition & 8 deletions net_netfd_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux
// +build darwin netbsd freebsd openbsd dragonfly linux

package netpoll
Expand All @@ -25,14 +26,6 @@ import (
"time"
)

// Conn extends net.Conn, but supports getting the conn's fd.
type Conn interface {
net.Conn

// Fd return conn's fd, used by poll
Fd() (fd int)
}

var _ Conn = &netFD{}

// Fd implements Conn.
Expand Down
36 changes: 22 additions & 14 deletions net_polldesc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand All @@ -23,7 +26,12 @@ import (
func newPollDesc(fd int) *pollDesc {
pd, op := &pollDesc{}, &FDOperator{}
op.FD = fd
op.OnWrite = pd.onwrite
op.OnHup = pd.onhup

pd.operator = op
pd.writeTrigger = make(chan struct{})
pd.closeTrigger = make(chan struct{})
return pd
}

Expand All @@ -41,20 +49,6 @@ type pollDesc struct {
func (pd *pollDesc) WaitWrite(ctx context.Context) error {
var err error
pd.once.Do(func() {
pd.writeTrigger = make(chan struct{})
pd.closeTrigger = make(chan struct{})
pd.operator.OnWrite = func(p Poll) error {
select {
case <-pd.writeTrigger:
default:
close(pd.writeTrigger)
}
return nil
}
pd.operator.OnHup = func(p Poll) error {
close(pd.closeTrigger)
return nil
}
// add ET|Write|Hup
pd.operator.poll = pollmanager.Pick()
err = pd.operator.Control(PollWritable)
Expand Down Expand Up @@ -83,6 +77,20 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) error {
}
}

func (pd *pollDesc) onwrite(p Poll) error {
select {
case <-pd.writeTrigger:
default:
close(pd.writeTrigger)
}
return nil
}

func (pd *pollDesc) onhup(p Poll) error {
close(pd.closeTrigger)
return nil
}

func (pd *pollDesc) detach() {
pd.operator.Control(PollDetach)
pd.operator.unused()
Expand Down
3 changes: 3 additions & 0 deletions net_polldesc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

import (
Expand Down
3 changes: 2 additions & 1 deletion net_sock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
// This file may have been modified by CloudWeGo authors. (“CloudWeGo Modifications”).
// All CloudWeGo Modifications are Copyright 2021 CloudWeGo authors.

// +build aix darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
//go:build !windows
// +build !windows

package netpoll

Expand Down
Loading

0 comments on commit d18be17

Please sign in to comment.