Skip to content

Commit

Permalink
gnet conn, gool, daemon, hb
Browse files Browse the repository at this point in the history
  • Loading branch information
snail007 committed Feb 15, 2025
1 parent ceac55e commit 64d62d5
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 41 deletions.
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,3 @@ A package must be include the fllowing files:
| README.md | testing and benchmarkresult must be include |

You can reference the package sync/gpool to get more information about code specification.

# Thanks

<a href="https://www.jetbrains.com/?from=gmc"><img src="https://resources.jetbrains.com/storage/products/company/brand/logos/jb_beam.png" height="120" alt="JetBrains"/></a>
40 changes: 31 additions & 9 deletions util/gpool/gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package gpool

import (
"bytes"
"crypto/rand"
"encoding/hex"
"errors"
Expand All @@ -15,6 +16,7 @@ import (
glist "github.com/snail007/gmc/util/list"
gmap "github.com/snail007/gmc/util/map"
"io"
"runtime/debug"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -59,6 +61,8 @@ type Option struct {
PreAlloc bool
// PanicHandler is used to handle panics from each job function.
PanicHandler func(e interface{})
//WithStack sets if fill stack info with submitted job
WithStack bool
}

// Blocking the count of queued job to run reach the max, if blocking Submit call
Expand Down Expand Up @@ -107,7 +111,9 @@ func (s *Pool) SetDebug(debug bool) {

// New create a gpool object to using
func New(workerCount int) (p *Pool) {
return NewWithOption(workerCount, &Option{})
return NewWithOption(workerCount, &Option{
WithStack: true,
})
}

func NewWithLogger(workerCount int, logger gcore.Logger) (p *Pool) {
Expand Down Expand Up @@ -237,7 +243,7 @@ func (s *Pool) newWorkerID() string {
}

// run a job function, using defer to catch job exception
func (s *Pool) run(fn func()) {
func (s *Pool) run(j *JobItem) {
defer func() {
s.g.Done()
if e := recover(); e != nil {
Expand All @@ -252,7 +258,12 @@ func (s *Pool) run(fn func()) {
}
}
}()
fn()
j.Job()
}

type JobItem struct {
Stack string
Job func()
}

// Submit adds a function as a job ready to run
Expand All @@ -271,8 +282,19 @@ func (s *Pool) Submit(job func()) error {
return ErrMaxQueuedJobCountReached
}
}
j := &JobItem{
Job: job,
}
if s.opt.WithStack {
a := bytes.SplitN(debug.Stack(), []byte("\n"), 4)
stackStr := string(a[0])
if len(a) > 3 {
stackStr += "\n" + string(a[3])
}
j.Stack = stackStr
}
s.g.Add(1)
s.jobs.Add(job)
s.jobs.Add(j)
s.notifyAll()
return nil
}
Expand All @@ -288,10 +310,10 @@ func (s *Pool) notifyAll() {
}

// shift an element from array head
func (s *Pool) pop() (fn func()) {
func (s *Pool) pop() (fn *JobItem) {
f := s.jobs.Pop()
if f != nil {
fn = f.(func())
fn = f.(*JobItem)
}
return
}
Expand Down Expand Up @@ -434,7 +456,7 @@ func (w *worker) start() {
w.pool.debugLog("Pool: worker[%s] stopped", w.id)
}()
w.pool.debugLog("Pool: worker[%s] started ...", w.id)
var fn func()
j := new(JobItem)
var doJob = func() (isReturn bool) {
w.SetStatus(statusRunning)
w.addRunningWorkerCounter(1)
Expand All @@ -446,9 +468,9 @@ func (w *worker) start() {
w.pool.debugLog("Pool: worker[%s] break", w.id)
return true
}
if fn = w.pool.pop(); fn != nil {
if j = w.pool.pop(); j != nil {
//w.pool.debugLog("Pool: worker[%s] called", w.id)
w.pool.run(fn)
w.pool.run(j)
} else {
w.pool.debugLog("Pool: worker[%s] no job, break", w.id)
break
Expand Down
95 changes: 93 additions & 2 deletions util/net/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gnet

import (
"bufio"
"github.com/pkg/errors"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -95,9 +96,39 @@ type Conn struct {
writeBytes *int64
rawConn net.Conn
onClose func(*Conn)
onCloseHook func(*Conn)
onInitializeHook func(*Conn)
closeOnce *sync.Once
autoCloseOnReadWriteError bool
initOnce *sync.Once
maxIdleTimeout time.Duration
touchTime time.Time
idleChn chan bool
onIdleTimeout func(*Conn)
}

func (s *Conn) SetOnClose(f func(conn *Conn)) {
s.onCloseHook = f
}

func (s *Conn) SetOnInitialize(f func(conn *Conn)) {
s.onInitializeHook = f
}

func (s *Conn) SetOnIdleTimeout(f func(conn *Conn)) {
s.onIdleTimeout = f
}

func (s *Conn) TouchTime() time.Time {
return s.touchTime
}

func (s *Conn) touch() {
s.touchTime = time.Now()
}

func (s *Conn) SetMaxIdleTimeout(d time.Duration) {
s.maxIdleTimeout = d
}

func (s *Conn) Ctx() Context {
Expand Down Expand Up @@ -154,7 +185,9 @@ func (s *Conn) Close() (err error) {
defer func() { _ = recover() }()
err = s.Conn.Close()
}()
close(s.idleChn)
s.onClose(s)
s.onCloseHook(s)
})
return
}
Expand Down Expand Up @@ -184,15 +217,50 @@ func (s *Conn) initialize() (err error) {
return err
}
s.Conn = s.ctx.Conn()

//init idle monitor, this must be after filters and codecs
s.initIdleTimeoutDaemon()

//call Initialize hook
s.onInitializeHook(s)

return nil
}

func (s *Conn) initIdleTimeoutDaemon() {
if s.maxIdleTimeout <= 0 {
return
}
go func() {
t := time.NewTimer(s.maxIdleTimeout)
defer t.Stop()
for {
t.Reset(s.maxIdleTimeout)
select {
case <-s.idleChn:
return
case <-t.C:
if s.touchTime.Add(s.maxIdleTimeout).Before(time.Now()) {
//idle timeout, close the connection
s.Close()
if s.onIdleTimeout != nil {
s.onIdleTimeout(s)
}
return
}
}
}
}()
}

func (s *Conn) Read(b []byte) (n int, err error) {
if e := s.doInitialize(); e != nil {
return 0, e
}
defer func() {
if n > 0 {
atomic.AddInt64(s.readBytes, int64(n))
s.touch()
}
if err != nil && s.autoCloseOnReadWriteError {
s.Close()
Expand All @@ -213,6 +281,7 @@ func (s *Conn) Write(b []byte) (n int, err error) {
defer func() {
if n > 0 {
atomic.AddInt64(s.writeBytes, int64(n))
s.touch()
}
if err != nil && s.autoCloseOnReadWriteError {
s.Close()
Expand Down Expand Up @@ -254,9 +323,13 @@ func newContextConn(ctx Context, conn net.Conn, f ...bool) *Conn {
readBytes: new(int64),
rawConn: conn,
onClose: func(conn *Conn) {},
onCloseHook: func(conn *Conn) {},
onInitializeHook: func(conn *Conn) {},
closeOnce: &sync.Once{},
initOnce: &sync.Once{},
autoCloseOnReadWriteError: true,
idleChn: make(chan bool),
touchTime: time.Now(),
}
}
ctx.(*defaultContext).conn = c
Expand Down Expand Up @@ -525,6 +598,18 @@ type ConnBinder struct {
ctx Context
started bool
autoClose bool
err error
}

func (s *ConnBinder) setError(err error) {
if s.err != nil || err == nil {
return
}
s.err = err
}

func (s *ConnBinder) Error() error {
return s.err
}

func (s *ConnBinder) SetAutoClose(autoClose bool) {
Expand Down Expand Up @@ -566,8 +651,14 @@ func (s *ConnBinder) copy(src, dst net.Conn) error {
}()
for {
n, err := src.Read(buf)
if err != nil {
err = errors.Wrap(err, "failed to read from src: "+src.RemoteAddr().String())
}
if n > 0 {
_, err = dst.Write(buf[:n])
if err != nil {
err = errors.Wrap(err, "failed to write to dst: "+dst.RemoteAddr().String())
}
atomic.AddInt64(s.trafficBytes, int64(n))
}
if err != nil {
Expand All @@ -584,12 +675,12 @@ func (s *ConnBinder) StartAndWait() {
g.Add(2)
go func() {
defer g.Done()
s.copy(s.src, s.dst)
s.setError(s.copy(s.src, s.dst))
s.onSrcClose(s.ctx)
}()
go func() {
defer g.Done()
s.copy(s.dst, s.src)
s.setError(s.copy(s.dst, s.src))
s.onDstClose(s.ctx)
}()
g.Wait()
Expand Down
Loading

0 comments on commit 64d62d5

Please sign in to comment.