Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use structured logging #50

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -52,17 +55,37 @@ func init() {
}
}

// This is a pattern to ensure that deferred functions executes before os.Exit
func main() {
// enable logging
klog.InitFlags(nil)
os.Exit(run())
}
func run() int {
// Enable logging in the Kubernetes core package way (support json output)
// https://github.com/kubernetes/component-base/tree/master
c := logsapi.NewLoggingConfiguration()
logsapi.AddGoFlags(c, flag.CommandLine)
flag.Parse()
logs.InitLogs()
if err := logsapi.ValidateAndApply(c, nil); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
return 1
}

// Create a context for structured logging, and catch termination signals
ctx, cancel := signal.NotifyContext(
context.Background(), os.Interrupt, unix.SIGINT)
defer cancel()

logger := klog.FromContext(ctx)
logger.Info("called", "args", flag.Args())

flag.VisitAll(func(flag *flag.Flag) {
klog.Infof("FLAG: --%s=%q", flag.Name, flag.Value)
logger.Info("flag", "name", flag.Name, "value", flag.Value)
})

if _, _, err := net.SplitHostPort(metricsBindAddress); err != nil {
klog.Fatalf("error parsing metrics bind address %s : %v", metricsBindAddress, err)
logger.Error(err, "parsing metrics bind address", "address", metricsBindAddress)
return 1
}

nodeName, err := nodeutil.GetHostname(hostnameOverride)
Expand Down Expand Up @@ -96,18 +119,6 @@ func main() {
panic(err.Error())
}

// trap Ctrl+C and call cancel on the context
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

// Enable signal handler
signalCh := make(chan os.Signal, 2)
defer func() {
close(signalCh)
cancel()
}()
signal.Notify(signalCh, os.Interrupt, unix.SIGINT)

informersFactory := informers.NewSharedInformerFactory(clientset, 0)

var npaClient *npaclient.Clientset
Expand Down Expand Up @@ -149,7 +160,8 @@ func main() {
cfg,
)
if err != nil {
klog.Fatalf("Can not start network policy controller: %v", err)
logger.Error(err, "Can not start network policy controller")
return 1
}
go func() {
err := networkPolicyController.Run(ctx)
Expand All @@ -161,13 +173,9 @@ func main() {
npaInformerFactory.Start(ctx.Done())
}

select {
case <-signalCh:
klog.Infof("Exiting: received signal")
cancel()
case <-ctx.Done():
}
<-ctx.Done()

// grace period to cleanup resources
time.Sleep(5 * time.Second)
return 0
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
Expand Down Expand Up @@ -53,6 +54,8 @@ require (
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
Expand Down
104 changes: 66 additions & 38 deletions pkg/networkpolicy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func newController(client clientset.Interface,
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})

klog.V(2).Infof("Creating controller: %#v", config)
klog.V(2).InfoS("Creating controller", "config", config)
c := &Controller{
client: client,
config: config,
Expand Down Expand Up @@ -357,12 +357,13 @@ type Controller struct {
func (c *Controller) Run(ctx context.Context) error {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
logger := klog.FromContext(ctx)

klog.Infof("Starting controller %s", controllerName)
defer klog.Infof("Shutting down controller %s", controllerName)
logger.Info("Starting controller", "name", controllerName)
defer logger.Info("Shutting down controller", "name", controllerName)

// Wait for the caches to be synced
klog.Info("Waiting for informer caches to sync")
logger.Info("Waiting for informer caches to sync")
caches := []cache.InformerSynced{c.networkpoliciesSynced, c.namespacesSynced, c.podsSynced}
if c.config.AdminNetworkPolicy || c.config.BaselineAdminNetworkPolicy {
caches = append(caches, c.nodesSynced)
Expand All @@ -381,14 +382,15 @@ func (c *Controller) Run(ctx context.Context) error {
registerMetrics(ctx)
// collect metrics periodically
go wait.UntilWithContext(ctx, func(ctx context.Context) {
logger := klog.FromContext(ctx)
queues, err := readNfnetlinkQueueStats()
if err != nil {
klog.Infof("error reading nfqueue stats: %v", err)
logger.Error(err, "reading nfqueue stats")
return
}
klog.V(4).Infof("Obtained metrics for %d queues", len(queues))
logger.V(4).Info("Obtained metrics for queues", "nqueues", len(queues))
for _, q := range queues {
klog.V(4).Infof("Updating metrics for queue: %d", q.id_sequence)
logger.V(4).Info("Updating metrics", "queue", q.id_sequence)
nfqueueQueueTotal.WithLabelValues(q.queue_number).Set(float64(q.queue_total))
nfqueueQueueDropped.WithLabelValues(q.queue_number).Set(float64(q.queue_dropped))
nfqueueUserDropped.WithLabelValues(q.queue_number).Set(float64(q.user_dropped))
Expand All @@ -398,9 +400,9 @@ func (c *Controller) Run(ctx context.Context) error {
}, 30*time.Second)

// Start the workers after the repair loop to avoid races
klog.Info("Syncing nftables rules")
logger.Info("Syncing nftables rules")
_ = c.syncNFTablesRules(ctx)
defer c.cleanNFTablesRules()
defer c.cleanNFTablesRules(ctx)
go wait.Until(c.runWorker, time.Second, ctx.Done())

var flags uint32
Expand Down Expand Up @@ -428,7 +430,7 @@ func (c *Controller) Run(ctx context.Context) error {

nf, err := nfqueue.Open(&config)
if err != nil {
klog.Infof("could not open nfqueue socket: %v", err)
logger.Info("could not open nfqueue socket", "error", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging like this exist at several places. I am unsure how to handle them. A general recommendation is to not log errors if you return them (let the caller decide). And normally logger.Error(err, ...) is used for logging errors, but then it will be at a higher log-level. I made a compromise by keeping the log at Info level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A way to keep info about what caused the error is to wrap them:

return fmt.Errorf("could not open nfqueue socket %w", err)

(ref: recent discussion on slack)

return err
}
defer nf.Close()
Expand All @@ -444,11 +446,11 @@ func (c *Controller) Run(ctx context.Context) error {
}

startTime := time.Now()
klog.V(2).Infof("Processing sync for packet %d", *a.PacketID)
logger.V(2).Info("Processing sync for packet", "id", *a.PacketID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use the same tlogger := logger.V(2) if tlogger.Enabled tricke we use here as we use at 517 in evaluage packet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not as urgent in this case since only PacketID is passed (not an entire packet analyzed)


packet, err := parsePacket(*a.Payload)
if err != nil {
klog.Infof("Can not process packet %d applying default policy (failOpen: %v): %v", *a.PacketID, c.config.FailOpen, err)
logger.Error(err, "Can not process packet, applying default policy", "id", *a.PacketID, "failOpen", c.config.FailOpen)
c.nfq.SetVerdict(*a.PacketID, verdict) //nolint:errcheck
return 0
}
Expand All @@ -459,10 +461,10 @@ func (c *Controller) Run(ctx context.Context) error {
packetProcessingHist.WithLabelValues(string(packet.proto), string(packet.family)).Observe(processingTime)
packetProcessingSum.Observe(processingTime)
packetCounterVec.WithLabelValues(string(packet.proto), string(packet.family)).Inc()
klog.V(2).Infof("Finished syncing packet %d took: %v accepted: %v", *a.PacketID, time.Since(startTime), verdict == nfqueue.NfAccept)
logger.V(2).Info("Finished syncing packet", "id", *a.PacketID, "duration", time.Since(startTime), "accepted", verdict == nfqueue.NfAccept)
}()

if c.evaluatePacket(packet) {
if c.evaluatePacket(ctx, packet) {
verdict = nfqueue.NfAccept
} else {
verdict = nfqueue.NfDrop
Expand All @@ -478,11 +480,11 @@ func (c *Controller) Run(ctx context.Context) error {
return 0
}
}
klog.Infof("Could not receive message: %v\n", err)
logger.Info("Could not receive message", "error", err)
return 0
})
if err != nil {
klog.Infof("could not open nfqueue socket: %v", err)
logger.Info("could not open nfqueue socket", "error", err)
return err
}

Expand All @@ -498,7 +500,8 @@ func (c *Controller) Run(ctx context.Context) error {
// 4. AdminNetworkPolicies in Ingress for the destination Pod/IP
// 5. NetworkPolicies in Ingress (if needed) for the destination Pod/IP
// 6. BaselineAdminNetworkPolicies in Ingress (if needed) for the destination Pod/IP
func (c *Controller) evaluatePacket(p packet) bool {
func (c *Controller) evaluatePacket(ctx context.Context, p packet) bool {
logger := klog.FromContext(ctx)
srcIP := p.srcIP
srcPod := c.getPodAssignedToIP(srcIP.String())
srcPort := p.srcPort
Expand All @@ -507,16 +510,25 @@ func (c *Controller) evaluatePacket(p packet) bool {
dstPort := p.dstPort
protocol := p.proto

klog.V(2).Infof("Evaluating packet %s", p.String())
// evaluatePacket() should be fast unless trace logging is enabled.
// Logging optimization: We check if V(2) is enabled before hand,
// rather than evaluating the all parameters make an unnecessary logger call
tlogger := logger.V(2)
if tlogger.Enabled() {
tlogger.Info("Evaluating packet", "packet", p)
tlogger = tlogger.WithValues("id", p.id)
}

// Evalute Egress Policies

// Admin Network Policies are evaluated first
evaluateEgressNetworkPolicy := true
if c.config.AdminNetworkPolicy {
srcPodAdminNetworkPolices := c.getAdminNetworkPoliciesForPod(srcPod)
srcPodAdminNetworkPolices := c.getAdminNetworkPoliciesForPod(ctx, srcPod)
action := c.evaluateAdminEgress(srcPodAdminNetworkPolices, dstPod, dstIP, dstPort, protocol)
klog.V(2).Infof("[Packet %d] Egress AdminNetworkPolicies: %d Action: %s", p.id, len(srcPodAdminNetworkPolices), action)
if tlogger.Enabled() {
tlogger.Info("Egress AdminNetworkPolicies", "npolicies", len(srcPodAdminNetworkPolices), "action", action)
}
switch action {
case npav1alpha1.AdminNetworkPolicyRuleActionDeny: // Deny the packet no need to check anything else
return false
Expand All @@ -531,16 +543,20 @@ func (c *Controller) evaluatePacket(p packet) bool {
if len(srcPodNetworkPolices) > 0 {
evaluateAdminEgressNetworkPolicy = false
}
allowed := c.evaluator(srcPodNetworkPolices, networkingv1.PolicyTypeEgress, srcPod, srcPort, dstPod, dstIP, dstPort, protocol)
klog.V(2).Infof("[Packet %d] Egress NetworkPolicies: %d Allowed: %v", p.id, len(srcPodNetworkPolices), allowed)
allowed := c.evaluator(ctx, srcPodNetworkPolices, networkingv1.PolicyTypeEgress, srcPod, srcPort, dstPod, dstIP, dstPort, protocol)
if tlogger.Enabled() {
tlogger.Info("Egress NetworkPolicies", "npolicies", len(srcPodNetworkPolices), "allowed", allowed)
}
if !allowed {
return false
}
}
if c.config.BaselineAdminNetworkPolicy && evaluateAdminEgressNetworkPolicy {
srcPodBaselineAdminNetworkPolices := c.getBaselineAdminNetworkPoliciesForPod(srcPod)
srcPodBaselineAdminNetworkPolices := c.getBaselineAdminNetworkPoliciesForPod(ctx, srcPod)
action := c.evaluateBaselineAdminEgress(srcPodBaselineAdminNetworkPolices, dstPod, dstIP, dstPort, protocol)
klog.V(2).Infof("[Packet %d] Egress BaselineAdminNetworkPolicies: %d Action: %s", p.id, len(srcPodBaselineAdminNetworkPolices), action)
if tlogger.Enabled() {
tlogger.Info("Egress BaselineAdminNetworkPolicies", "npolicies", len(srcPodBaselineAdminNetworkPolices), "action", action)
}
switch action {
case npav1alpha1.BaselineAdminNetworkPolicyRuleActionDeny: // Deny the packet no need to check anything else
return false
Expand All @@ -552,9 +568,11 @@ func (c *Controller) evaluatePacket(p packet) bool {

// Admin Network Policies are evaluated first
if c.config.AdminNetworkPolicy {
dstPodAdminNetworkPolices := c.getAdminNetworkPoliciesForPod(dstPod)
dstPodAdminNetworkPolices := c.getAdminNetworkPoliciesForPod(ctx, dstPod)
action := c.evaluateAdminIngress(dstPodAdminNetworkPolices, srcPod, dstPort, protocol)
klog.V(2).Infof("[Packet %d] Ingress AdminNetworkPolicies: %d Action: %s", p.id, len(dstPodAdminNetworkPolices), action)
if tlogger.Enabled() {
tlogger.Info("Ingress AdminNetworkPolicies", "npolicies", len(dstPodAdminNetworkPolices), "action", action)
}
switch action {
case npav1alpha1.AdminNetworkPolicyRuleActionDeny: // Deny the packet no need to check anything else
return false
Expand All @@ -566,14 +584,18 @@ func (c *Controller) evaluatePacket(p packet) bool {
// Network policies override Baseline Admin Network Policies
dstPodNetworkPolices := c.getNetworkPoliciesForPod(dstPod)
if len(dstPodNetworkPolices) > 0 {
allowed := c.evaluator(dstPodNetworkPolices, networkingv1.PolicyTypeIngress, dstPod, dstPort, srcPod, srcIP, srcPort, protocol)
klog.V(2).Infof("[Packet %d] Ingress NetworkPolicies: %d Allowed: %v", p.id, len(dstPodNetworkPolices), allowed)
allowed := c.evaluator(ctx, dstPodNetworkPolices, networkingv1.PolicyTypeIngress, dstPod, dstPort, srcPod, srcIP, srcPort, protocol)
if tlogger.Enabled() {
tlogger.Info("Ingress NetworkPolicies", "npolicies", len(dstPodNetworkPolices), "allowed", allowed)
}
return allowed
}
if c.config.BaselineAdminNetworkPolicy {
dstPodBaselineAdminNetworkPolices := c.getBaselineAdminNetworkPoliciesForPod(dstPod)
dstPodBaselineAdminNetworkPolices := c.getBaselineAdminNetworkPoliciesForPod(ctx, dstPod)
action := c.evaluateBaselineAdminIngress(dstPodBaselineAdminNetworkPolices, srcPod, dstPort, protocol)
klog.V(2).Infof("[Packet %d] Ingress BaselineAdminNetworkPolicies: %d Action: %s", p.id, len(dstPodBaselineAdminNetworkPolices), action)
if tlogger.Enabled() {
tlogger.Info("Ingress BaselineAdminNetworkPolicies", "npolicies", len(dstPodBaselineAdminNetworkPolices), "action", action)
}
switch action {
case npav1alpha1.BaselineAdminNetworkPolicyRuleActionDeny: // Deny the packet no need to check anything else
return false
Expand Down Expand Up @@ -619,7 +641,7 @@ func (c *Controller) handleErr(err error, key string) {

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing %v: %v", key, err)
klog.ErrorS(err, "syncing", "key", key)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
Expand All @@ -630,7 +652,7 @@ func (c *Controller) handleErr(err error, key string) {
c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
utilruntime.HandleError(err)
klog.Infof("Dropping %q out of the queue: %v", key, err)
klog.InfoS("Dropping out of the queue", "error", err, "key", key)
}

// syncNFTablesRules adds the necessary rules to process the first connection packets in userspace
Expand Down Expand Up @@ -795,11 +817,11 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) error {
}

if c.config.NetfilterBug1766Fix {
c.addDNSRacersWorkaroundRules(tx)
c.addDNSRacersWorkaroundRules(ctx, tx)
}

if err := c.nft.Run(ctx, tx); err != nil {
klog.Infof("error syncing nftables rules %v", err)
klog.FromContext(ctx).Info("syncing nftables rules", "error", err)
return err
}
return nil
Expand All @@ -813,7 +835,7 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) error {
// This can be removed once all kernels contain the fix in
// https://github.com/torvalds/linux/commit/8af79d3edb5fd2dce35ea0a71595b6d4f9962350
// TODO: remove once kernel fix is on most distros
func (c *Controller) addDNSRacersWorkaroundRules(tx *knftables.Transaction) {
func (c *Controller) addDNSRacersWorkaroundRules(ctx context.Context, tx *knftables.Transaction) {
hook := knftables.PreroutingHook
chainName := string(hook)
tx.Add(&knftables.Chain{
Expand Down Expand Up @@ -873,14 +895,20 @@ func (c *Controller) addDNSRacersWorkaroundRules(tx *knftables.Transaction) {
}
}

func (c *Controller) cleanNFTablesRules() {
func (c *Controller) cleanNFTablesRules(ctx context.Context) {
tx := c.nft.NewTransaction()
// Add+Delete is idempotent and won't return an error if the table doesn't already
// exist.
tx.Add(&knftables.Table{})
tx.Delete(&knftables.Table{})

if err := c.nft.Run(context.TODO(), tx); err != nil {
klog.Infof("error deleting nftables rules %v", err)
// When this function is called, the ctx is likely cancelled. So
// we only use it for logging, and create a context with timeout
// for nft.Run. There is a grace period of 5s in main, so we keep
// this timeout shorter
nctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
if err := c.nft.Run(nctx, tx); err != nil {
klog.FromContext(ctx).Error(err, "deleting nftables rules")
}
}
Loading
Loading