Skip to content

Commit

Permalink
TUN-8701: Add metrics and adjust logs for datagram v3
Browse files Browse the repository at this point in the history
Closes TUN-8701
  • Loading branch information
DevinCarr committed Nov 7, 2024
1 parent 952622a commit 1f3e304
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 62 deletions.
9 changes: 8 additions & 1 deletion connection/quic_datagram_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/quic-go/quic-go"
"github.com/rs/zerolog"

"github.com/cloudflare/cloudflared/management"
cfdquic "github.com/cloudflare/cloudflared/quic/v3"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
Expand All @@ -25,9 +26,15 @@ func NewDatagramV3Connection(ctx context.Context,
conn quic.Connection,
sessionManager cfdquic.SessionManager,
index uint8,
metrics cfdquic.Metrics,
logger *zerolog.Logger,
) DatagramSessionHandler {
datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, index, logger)
log := logger.
With().
Int(management.EventTypeKey, int(management.UDP)).
Uint8(LogFieldConnIndex, index).
Logger()
datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, index, metrics, &log)

return &datagramV3Connection{
conn,
Expand Down
12 changes: 7 additions & 5 deletions quic/v3/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (

var (
// ErrSessionNotFound indicates that a session has not been registered yet for the request id.
ErrSessionNotFound = errors.New("session not found")
ErrSessionNotFound = errors.New("flow not found")
// ErrSessionBoundToOtherConn is returned when a registration already exists for a different connection.
ErrSessionBoundToOtherConn = errors.New("session is in use by another connection")
ErrSessionBoundToOtherConn = errors.New("flow is in use by another connection")
// ErrSessionAlreadyRegistered is returned when a registration already exists for this connection.
ErrSessionAlreadyRegistered = errors.New("session is already registered for this connection")
ErrSessionAlreadyRegistered = errors.New("flow is already registered for this connection")
)

type SessionManager interface {
Expand All @@ -39,12 +39,14 @@ type DialUDP func(dest netip.AddrPort) (*net.UDPConn, error)
type sessionManager struct {
sessions map[RequestID]Session
mutex sync.RWMutex
metrics Metrics
log *zerolog.Logger
}

func NewSessionManager(log *zerolog.Logger, originDialer DialUDP) SessionManager {
func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP) SessionManager {
return &sessionManager{
sessions: make(map[RequestID]Session),
metrics: metrics,
log: log,
}
}
Expand All @@ -65,7 +67,7 @@ func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram
return nil, err
}
// Create and insert the new session in the map
session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.log)
session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.metrics, s.log)
s.sessions[request.RequestID] = session
return session, nil
}
Expand Down
4 changes: 2 additions & 2 deletions quic/v3/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func TestRegisterSession(t *testing.T) {
log := zerolog.Nop()
manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort)
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort)

request := v3.UDPSessionRegistrationDatagram{
RequestID: testRequestID,
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestRegisterSession(t *testing.T) {

func TestGetSession_Empty(t *testing.T) {
log := zerolog.Nop()
manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort)
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort)

_, err := manager.GetSession(testRequestID)
if !errors.Is(err, v3.ErrSessionNotFound) {
Expand Down
90 changes: 90 additions & 0 deletions quic/v3/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package v3

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "cloudflared"
subsystem = "udp"
)

type Metrics interface {
IncrementFlows()
DecrementFlows()
PayloadTooLarge()
RetryFlowResponse()
MigrateFlow()
}

type metrics struct {
activeUDPFlows prometheus.Gauge
totalUDPFlows prometheus.Counter
payloadTooLarge prometheus.Counter
retryFlowResponses prometheus.Counter
migratedFlows prometheus.Counter
}

func (m *metrics) IncrementFlows() {
m.totalUDPFlows.Inc()
m.activeUDPFlows.Inc()
}

func (m *metrics) DecrementFlows() {
m.activeUDPFlows.Dec()
}

func (m *metrics) PayloadTooLarge() {
m.payloadTooLarge.Inc()
}

func (m *metrics) RetryFlowResponse() {
m.retryFlowResponses.Inc()
}

func (m *metrics) MigrateFlow() {
m.migratedFlows.Inc()
}

func NewMetrics(registerer prometheus.Registerer) Metrics {
m := &metrics{
activeUDPFlows: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "active_flows",
Help: "Concurrent count of UDP flows that are being proxied to any origin",
}),
totalUDPFlows: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_flows",
Help: "Total count of UDP flows that have been proxied to any origin",
}),
payloadTooLarge: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "payload_too_large",
Help: "Total count of UDP flows that have had origin payloads that are too large to proxy",
}),
retryFlowResponses: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "retry_flow_responses",
Help: "Total count of UDP flows that have had to send their registration response more than once",
}),
migratedFlows: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "migrated_flows",
Help: "Total count of UDP flows have been migrated across local connections",
}),
}
registerer.MustRegister(
m.activeUDPFlows,
m.totalUDPFlows,
m.payloadTooLarge,
m.retryFlowResponses,
m.migratedFlows,
)
return m
}
9 changes: 9 additions & 0 deletions quic/v3/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package v3_test

type noopMetrics struct{}

func (noopMetrics) IncrementFlows() {}
func (noopMetrics) DecrementFlows() {}
func (noopMetrics) PayloadTooLarge() {}
func (noopMetrics) RetryFlowResponse() {}
func (noopMetrics) MigrateFlow() {}
49 changes: 28 additions & 21 deletions quic/v3/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,20 @@ type datagramConn struct {
conn QuicConnection
index uint8
sessionManager SessionManager
metrics Metrics
logger *zerolog.Logger

datagrams chan []byte
readErrors chan error
}

func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, index uint8, logger *zerolog.Logger) DatagramConn {
func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, index uint8, metrics Metrics, logger *zerolog.Logger) DatagramConn {
log := logger.With().Uint8("datagramVersion", 3).Logger()
return &datagramConn{
conn: conn,
index: index,
sessionManager: sessionManager,
metrics: metrics,
logger: &log,
datagrams: make(chan []byte, demuxChanCapacity),
readErrors: make(chan error, 2),
Expand Down Expand Up @@ -143,19 +145,21 @@ func (c *datagramConn) Serve(ctx context.Context) error {
c.logger.Err(err).Msgf("unable to unmarshal session registration datagram")
return
}
logger := c.logger.With().Str(logFlowID, reg.RequestID.String()).Logger()
// We bind the new session to the quic connection context instead of cloudflared context to allow for the
// quic connection to close and close only the sessions bound to it. Closing of cloudflared will also
// initiate the close of the quic connection, so we don't have to worry about the application context
// in the scope of a session.
c.handleSessionRegistrationDatagram(connCtx, reg)
c.handleSessionRegistrationDatagram(connCtx, reg, &logger)
case UDPSessionPayloadType:
payload := &UDPSessionPayloadDatagram{}
err := payload.UnmarshalBinary(datagram)
if err != nil {
c.logger.Err(err).Msgf("unable to unmarshal session payload datagram")
return
}
c.handleSessionPayloadDatagram(payload)
logger := c.logger.With().Str(logFlowID, payload.RequestID.String()).Logger()
c.handleSessionPayloadDatagram(payload, &logger)
case UDPSessionRegistrationResponseType:
// cloudflared should never expect to receive UDP session responses as it will not initiate new
// sessions towards the edge.
Expand All @@ -169,31 +173,33 @@ func (c *datagramConn) Serve(ctx context.Context) error {
}

// This method handles new registrations of a session and the serve loop for the session.
func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram) {
func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram, logger *zerolog.Logger) {
session, err := c.sessionManager.RegisterSession(datagram, c)
switch err {
case nil:
// Continue as normal
case ErrSessionAlreadyRegistered:
// Session is already registered and likely the response got lost
c.handleSessionAlreadyRegistered(datagram.RequestID)
c.handleSessionAlreadyRegistered(datagram.RequestID, logger)
return
case ErrSessionBoundToOtherConn:
// Session is already registered but to a different connection
c.handleSessionMigration(datagram.RequestID)
c.handleSessionMigration(datagram.RequestID, logger)
return
default:
c.logger.Err(err).Msgf("session registration failure")
c.handleSessionRegistrationFailure(datagram.RequestID)
logger.Err(err).Msgf("flow registration failure")
c.handleSessionRegistrationFailure(datagram.RequestID, logger)
return
}
c.metrics.IncrementFlows()
// Make sure to eventually remove the session from the session manager when the session is closed
defer c.sessionManager.UnregisterSession(session.ID())
defer c.metrics.DecrementFlows()

// Respond that we are able to process the new session
err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk)
if err != nil {
c.logger.Err(err).Msgf("session registration failure: unable to send session registration response")
logger.Err(err).Msgf("flow registration failure: unable to send session registration response")
return
}

Expand All @@ -203,24 +209,24 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
if err == nil {
// We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical
// expected error response.
c.logger.Warn().Msg("session was closed without explicit close or timeout")
logger.Warn().Msg("flow was closed without explicit close or timeout")
return
}
// SessionIdleErr and SessionCloseErr are valid and successful error responses to end a session.
if errors.Is(err, SessionIdleErr{}) || errors.Is(err, SessionCloseErr) {
c.logger.Debug().Msg(err.Error())
logger.Debug().Msg(err.Error())
return
}

// All other errors should be reported as errors
c.logger.Err(err).Msgf("session was closed with an error")
logger.Err(err).Msgf("flow was closed with an error")
}

func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID) {
func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logger *zerolog.Logger) {
// Send another registration response since the session is already active
err := c.SendUDPSessionResponse(requestID, ResponseOk)
if err != nil {
c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response")
logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response")
return
}

Expand All @@ -233,9 +239,10 @@ func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID) {
// The session is already running in another routine so we want to restart the idle timeout since no proxied
// packets have come down yet.
session.ResetIdleTimer()
c.metrics.RetryFlowResponse()
}

func (c *datagramConn) handleSessionMigration(requestID RequestID) {
func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerolog.Logger) {
// We need to migrate the currently running session to this edge connection.
session, err := c.sessionManager.GetSession(requestID)
if err != nil {
Expand All @@ -250,29 +257,29 @@ func (c *datagramConn) handleSessionMigration(requestID RequestID) {
// Send another registration response since the session is already active
err = c.SendUDPSessionResponse(requestID, ResponseOk)
if err != nil {
c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response")
logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response")
return
}
}

func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID) {
func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, logger *zerolog.Logger) {
err := c.SendUDPSessionResponse(requestID, ResponseUnableToBindSocket)
if err != nil {
c.logger.Err(err).Msgf("unable to send session registration error response (%d)", ResponseUnableToBindSocket)
logger.Err(err).Msgf("unable to send flow registration error response (%d)", ResponseUnableToBindSocket)
}
}

// Handles incoming datagrams that need to be sent to a registered session.
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram) {
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) {
s, err := c.sessionManager.GetSession(datagram.RequestID)
if err != nil {
c.logger.Err(err).Msgf("unable to find session")
logger.Err(err).Msgf("unable to find flow")
return
}
// We ignore the bytes written to the socket because any partial write must return an error.
_, err = s.Write(datagram.Payload)
if err != nil {
c.logger.Err(err).Msgf("unable to write payload for unavailable session")
logger.Err(err).Msgf("unable to write payload for the flow")
return
}
}
Loading

0 comments on commit 1f3e304

Please sign in to comment.