Skip to content

Commit

Permalink
server: count active client connections
Browse files Browse the repository at this point in the history
Add a proxy connection count metric to count active client connections.
We only increment the counter after the first successful read or write.
  • Loading branch information
Jovis7 committed Jul 19, 2024
1 parent 0df4a97 commit 32fa7f2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 0 deletions.
4 changes: 4 additions & 0 deletions http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func (p *Proxy) ListenAndServe(ctx context.Context) error {
Filter: instrumentedFilter,
OKDoesNotWaitForUpstream: !p.ConnectOKWaitsForUpstream,
OnError: instrumentedErrorHandler,
OnActive: func() {
// count the connection only when a connection is established and becomes active
p.instrument.Connection(ctx)
},
})
stopProxiedBytes := p.configureTeleportProxiedBytes()
defer stopProxiedBytes()
Expand Down
7 changes: 7 additions & 0 deletions instrument/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Instrument interface {
XBQHeaderSent(ctx context.Context)
SuspectedProbing(ctx context.Context, fromIP net.IP, reason string)
ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string)
Connection(ctx context.Context)
ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider)
ReportProxiedBytes(tp *sdktrace.TracerProvider)
ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider)
Expand Down Expand Up @@ -75,6 +76,7 @@ func (i NoInstrument) ProxiedBytes(ctx context.Context, sent, recv int, platform
func (i NoInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
}
func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {}
func (i NoInstrument) Connection(ctx context.Context) {}
func (i NoInstrument) ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
}
func (i NoInstrument) ReportOriginBytes(tp *sdktrace.TracerProvider) {}
Expand Down Expand Up @@ -298,6 +300,11 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int,
ins.statsMx.Unlock()
}

// Connection counts the number of incoming connections
func (ins *defaultInstrument) Connection(ctx context.Context) {
otelinstrument.Connections.Add(ctx, 1)
}

// quicPackets is used by QuicTracer to update QUIC retransmissions mainly for block detection.
func (ins *defaultInstrument) quicSentPacket(ctx context.Context) {
otelinstrument.QuicPackets.Add(ctx, 1, metric.WithAttributes(attribute.KeyValue{"state", attribute.StringValue("sent")}))
Expand Down
4 changes: 4 additions & 0 deletions instrument/otelinstrument/otelinstrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
XBQ metric.Int64Counter
Throttling metric.Int64Counter
SuspectedProbing metric.Int64Counter
Connections metric.Int64Counter
DistinctClients1m, DistinctClients10m, DistinctClients1h *distinct.SlidingWindowDistinctCount
distinctClients metric.Int64ObservableGauge
)
Expand Down Expand Up @@ -75,6 +76,9 @@ func initialize() error {
if SuspectedProbing, err = meter.Int64Counter("proxy.probing.suspected"); err != nil {
return err
}
if Connections, err = meter.Int64Counter("proxy.connections", metric.WithUnit("connections")); err != nil {
return err
}

DistinctClients1m = distinct.NewSlidingWindowDistinctCount(time.Minute, time.Second)
DistinctClients10m = distinct.NewSlidingWindowDistinctCount(10*time.Minute, 10*time.Second)
Expand Down
58 changes: 58 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"reflect"
"strings"
"sync"
"time"

"github.com/getlantern/errors"
Expand Down Expand Up @@ -51,6 +52,10 @@ type Opts struct {
// Temporary network errors (errors of type net.Error for which Temporary()
// returns true) will not trigger this callback.
OnAcceptError func(err error) (fatalErr error)

// OnActive is called only once when a connection is accepted and has done
// either a first Read() or Write()
OnActive func()
}

// Server is an HTTP proxy server.
Expand All @@ -62,6 +67,7 @@ type Server struct {
listenerGenerators []ListenerGenerator
onError func(conn net.Conn, err error)
onAcceptError func(err error) (fatalErr error)
onActive func()
}

// New constructs a new HTTP proxy server using the given options
Expand Down Expand Up @@ -92,10 +98,14 @@ func New(opts *Opts) *Server {
if opts.OnAcceptError == nil {
opts.OnAcceptError = func(err error) (fatalErr error) { return err }
}
if opts.OnActive == nil {
opts.OnActive = func() {}
}
return &Server{
proxy: p,
onError: opts.OnError,
onAcceptError: opts.OnAcceptError,
onActive: opts.OnActive,
}
}

Expand Down Expand Up @@ -165,6 +175,8 @@ func (s *Server) serve(listener net.Listener, readyCb func(addr string)) error {
continue
}
tempDelay = 0
// wrap the conn so s.onActive will be called after first successful Read or Write
conn = wrapOnActiveConn(conn, s.onActive)
s.handle(conn)
}
}
Expand Down Expand Up @@ -264,3 +276,49 @@ func (l *allowinglistener) Close() error {
func (l *allowinglistener) Addr() net.Addr {
return l.wrapped.Addr()
}

type onActiveConn struct {
listeners.WrapConnEmbeddable
net.Conn

once sync.Once
onActive func()
}

// WrapOnActiveConn wraps a net.Conn and calls onActive once after first successful Read or Write
func wrapOnActiveConn(conn net.Conn, onActive func()) net.Conn {
wc, _ := conn.(listeners.WrapConnEmbeddable)
return &onActiveConn{wc, conn, sync.Once{}, onActive}
}

func (c *onActiveConn) OnState(s http.ConnState) {
if c.WrapConnEmbeddable != nil {
c.WrapConnEmbeddable.OnState(s)
}
}

func (c *onActiveConn) ControlMessage(msgType string, data interface{}) {
if c.WrapConnEmbeddable != nil {
c.WrapConnEmbeddable.ControlMessage(msgType, data)
}
}

func (c *onActiveConn) Wrapped() net.Conn {
return c.Conn
}

func (c *onActiveConn) Read(b []byte) (int, error) {
n, err := c.Conn.Read(b)
if err == nil {
c.once.Do(c.onActive)
}
return n, err
}

func (c *onActiveConn) Write(b []byte) (int, error) {
n, err := c.Conn.Write(b)
if err == nil {
c.once.Do(c.onActive)
}
return n, err
}

0 comments on commit 32fa7f2

Please sign in to comment.