Skip to content

Commit

Permalink
MINOR: runtime: update whole backend at once
Browse files Browse the repository at this point in the history
this will reduce number of socket calls to HAProxy to library
  • Loading branch information
oktalz committed Mar 22, 2024
1 parent d723590 commit 73258ce
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 20 deletions.
3 changes: 1 addition & 2 deletions pkg/haproxy/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ type HAProxyClient interface { //nolint:interfacebloat
PeerEntryEdit(peerSection string, peer models.PeerEntry) error
RefreshBackends() (deleted []string, err error)
SetMapContent(mapFile string, payload []string) error
SetServerAddr(backendName string, serverName string, ip string, port int) error
SetServerState(backendName string, serverName string, state string) error
SetServerAddrAndState([]RuntimeServerData) error
ServerGet(serverName, backendNa string) (models.Server, error)
SetAuxCfgFile(auxCfgFile string)
SyncBackendSrvs(backend *store.RuntimeBackend, portUpdated bool) error
Expand Down
122 changes: 104 additions & 18 deletions pkg/haproxy/api/runtime.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
package api

import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/haproxytech/client-native/v5/models"
"github.com/haproxytech/client-native/v5/runtime"

"github.com/haproxytech/kubernetes-ingress/pkg/store"
"github.com/haproxytech/kubernetes-ingress/pkg/utils"
)

var ErrMapNotFound = fmt.Errorf("map not found")

// bufsize is the default value of HAproxy tune.bufsize. Not recommended to change it
// Map payload cannot be bigger than tune.bufsize
const bufSize = 16000

type RuntimeServerData struct {
BackendName string
ServerName string
IP string
Port int
State string
}

func (c *clientNative) ExecuteRaw(command string) (result []string, err error) {
runtime, err := c.nativeAPI.Runtime()
if err != nil {
Expand All @@ -20,20 +35,83 @@ func (c *clientNative) ExecuteRaw(command string) (result []string, err error) {
return runtime.ExecuteRaw(command)
}

func (c *clientNative) SetServerAddr(backendName string, serverName string, ip string, port int) error {
func (c *clientNative) SetServerAddrAndState(servers []RuntimeServerData) error {
runtime, err := c.nativeAPI.Runtime()
if err != nil {
return err
}
return runtime.SetServerAddr(backendName, serverName, ip, port)
if len(servers) == 0 {
return nil
}
backendNameSize := len(servers[0].BackendName)
oneServerCommandSize := 65 + 2*backendNameSize
size := oneServerCommandSize * len(servers)
if size > bufSize {
size = bufSize
}

var sb strings.Builder
sb.Grow(size)
var cmdBuilder strings.Builder
cmdBuilder.Grow(oneServerCommandSize)
for _, server := range servers {
// if new commands are added recalculate oneServerCommandSize
cmdBuilder.WriteString("set server ")
cmdBuilder.WriteString(server.BackendName)
cmdBuilder.WriteString("/")
cmdBuilder.WriteString(server.ServerName)
cmdBuilder.WriteString(" addr ")
cmdBuilder.WriteString(server.IP)
if server.Port > 0 {
cmdBuilder.WriteString(" port ")
cmdBuilder.WriteString(strconv.Itoa(server.Port))
}
cmdBuilder.WriteString(";set server ")
cmdBuilder.WriteString(server.BackendName)
cmdBuilder.WriteString("/")
cmdBuilder.WriteString(server.ServerName)
cmdBuilder.WriteString(" state ")
cmdBuilder.WriteString(server.State)
cmdBuilder.WriteString(";")
// if new commands are added recalculate oneServerCommandSize

if sb.Len()+cmdBuilder.Len() >= size {
err = c.runRaw(runtime, sb, server.BackendName)
if err != nil {
return err
}
sb.Reset()
sb.Grow(size)
}
sb.WriteString(cmdBuilder.String())
cmdBuilder.Reset()
cmdBuilder.Grow(oneServerCommandSize)
}
if sb.Len() > 0 {
err = c.runRaw(runtime, sb, servers[0].BackendName)
if err != nil {
return err
}
}
return nil
}

func (c *clientNative) SetServerState(backendName string, serverName string, state string) error {
runtime, err := c.nativeAPI.Runtime()
func (c *clientNative) runRaw(runtime runtime.Runtime, sb strings.Builder, backendName string) error {
logger := utils.GetLogger()
result, err := runtime.ExecuteRaw(sb.String())
if err != nil {
return err
}
return runtime.SetServerState(backendName, serverName, state)
for i := 0; i < len(result); i++ {
if len(result[i]) > 5 {
switch result[i][1:5] {
case "[3]:", "[2]:", "[1]:", "[0]:":
logger.Errorf("[RUNTIME] [BACKEND] [SOCKET] backend %s: Error: '%s', server slots adjustment ?", backendName, result[i])
return errors.New("runtime update failed for " + backendName)
}
}
}
return nil
}

func (c *clientNative) SetMapContent(mapFile string, payload []string) error {
Expand Down Expand Up @@ -91,7 +169,6 @@ func (c *clientNative) SyncBackendSrvs(backend *store.RuntimeBackend, portUpdate
// Disable stale entries from HAProxySrvs
// and provide list of Disabled Srvs
var disabled []*store.HAProxySrv
var errors utils.Errors
for i, srv := range haproxySrvs {
srv.Modified = srv.Modified || portUpdated
if _, ok := addresses[srv.Address]; ok {
Expand All @@ -118,26 +195,35 @@ func (c *clientNative) SyncBackendSrvs(backend *store.RuntimeBackend, portUpdate
logger.Tracef("[RUNTIME] [BACKEND] [SERVER] backend %s: list of endpoints addresses after treatment %+v", backend.Name, addresses)

// Dynamically updates HAProxy backend servers with HAProxySrvs content
var addrErr, stateErr error
runtimeServerData := make([]RuntimeServerData, 0, len(haproxySrvs))
for _, srv := range haproxySrvs {
if !srv.Modified {
continue
}
if srv.Address == "" {
logger.Tracef("[RUNTIME] [BACKEND] [SERVER] [SOCKET] backend %s: server '%s' changed status to %v", backend.Name, srv.Name, "maint")
addrErr = c.SetServerAddr(backend.Name, srv.Name, "127.0.0.1", 0)
stateErr = c.SetServerState(backend.Name, srv.Name, "maint")
runtimeServerData = append(runtimeServerData, RuntimeServerData{
BackendName: backend.Name,
ServerName: srv.Name,
IP: "127.0.0.1",
Port: 0,
State: "maint",
})
} else {
logger.Tracef("[RUNTIME] [BACKEND] [SERVER] [SOCKET] backend %s: server '%s': addr '%s' changed status to %v", backend.Name, srv.Name, srv.Address, "ready")
addrErr = c.SetServerAddr(backend.Name, srv.Name, srv.Address, int(backend.Endpoints.Port))
stateErr = c.SetServerState(backend.Name, srv.Name, "ready")
}
if addrErr != nil || stateErr != nil {
backend.DynUpdateFailed = true
errors.Add(addrErr)
errors.Add(stateErr)
logger.Errorf("[RUNTIME] [BACKEND] [SERVER] [SOCKET] backend %s: server '%s': addr '%s': addrError '%v': stateError: '%v'", backend.Name, srv.Name, srv.Address, addrErr, stateErr)
runtimeServerData = append(runtimeServerData, RuntimeServerData{
BackendName: backend.Name,
ServerName: srv.Name,
IP: srv.Address,
Port: int(backend.Endpoints.Port),
State: "ready",
})
}
}
return errors.Result()
err := c.SetServerAddrAndState(runtimeServerData)
if err != nil {
backend.DynUpdateFailed = true
return err
}
return nil
}

0 comments on commit 73258ce

Please sign in to comment.