Skip to content

Commit

Permalink
feat: privval service creation logic + conflict resolutions (#464)
Browse files Browse the repository at this point in the history
* feature:service creation logic + conflict resolutions

* update to fix test

* update

* Update comment

* update tests

* replace anonymous func with simple if clause as reviewed

* update
  • Loading branch information
vimystic authored Feb 6, 2025
1 parent 71e1397 commit e17bee9
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 61 deletions.
79 changes: 66 additions & 13 deletions internal/fullnode/service_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,35 @@ const maxP2PServiceDefault = int32(1)

// BuildServices returns a list of services given the crd.
//
// Creates a single RPC service, likely for use with an Ingress.
// Creates services based on the node type:
// - For regular nodes: Creates 1 RPC service and 1 p2p service per pod (replicas + 1 total)
// - For sentry nodes: Creates 1 RPC service and 2 p2p services per pod (replicas * 2 + 1 total)
//
// Creates 1 p2p service per pod. P2P diverges from traditional web and kubernetes architecture which calls for a single
// p2p service backed by multiple pods.
// Pods may be in various states even with proper readiness probes.
// Therefore, we do not want to confuse or disrupt peer exchange (PEX) within CometBFT.
// If using a single p2p service, an outside peer discovering a pod out of sync it could be
// interpreted as byzantine behavior if the peer previously connected to a pod that was in sync through the same
// external address.
// P2P services diverge from traditional web and kubernetes architecture which typically uses a single
// service backed by multiple pods. This is necessary because:
// 1. Pods may be in various states even with proper readiness probes
// 2. We need to prevent confusion or disruption in peer exchange (PEX) within CometBFT
// 3. Using a single p2p service could lead to misinterpreted byzantine behavior if an outside peer
// discovers a pod out of sync after previously connecting to a synced pod through the same address
//
// Services are created with either LoadBalancer (for external access) or ClusterIP type, controlled
// by MaxP2PExternalAddresses setting.
func BuildServices(crd *cosmosv1.CosmosFullNode) []diff.Resource[*corev1.Service] {
max := maxP2PServiceDefault
if v := crd.Spec.Service.MaxP2PExternalAddresses; v != nil {
max = *v
}
maxExternal := lo.Clamp(max, 0, crd.Spec.Replicas)
p2ps := make([]diff.Resource[*corev1.Service], crd.Spec.Replicas)
totalServices := crd.Spec.Replicas + 1
if crd.Spec.Type == cosmosv1.Sentry {
totalServices = (crd.Spec.Replicas * 2) + 1
}

svcs := make([]diff.Resource[*corev1.Service], 0, totalServices)

startOrdinal := crd.Spec.Ordinals.Start
for i := int32(0); i < crd.Spec.Replicas; i++ {
ordinal := crd.Spec.Ordinals.Start + i
ordinal := startOrdinal + i
var svc corev1.Service
svc.Name = p2pServiceName(crd, ordinal)
svc.Namespace = crd.Namespace
Expand Down Expand Up @@ -61,10 +72,48 @@ func BuildServices(crd *cosmosv1.CosmosFullNode) []diff.Resource[*corev1.Service
svc.Spec.Type = corev1.ServiceTypeClusterIP
svc.Spec.ClusterIP = *valOrDefault(crd.Spec.Service.P2PTemplate.ClusterIP, ptr(""))
}
p2ps[i] = diff.Adapt(&svc, int(i))
svcs = append(svcs, diff.Adapt(&svc, len(svcs)))
}

// Add sentry services if needed
if crd.Spec.Type == cosmosv1.Sentry {
for i := int32(0); i < crd.Spec.Replicas; i++ {
ordinal := startOrdinal + i
var svc corev1.Service
svc.Name = sentryServiceName(crd, ordinal)
svc.Namespace = crd.Namespace
svc.Kind = "Service"
svc.APIVersion = "v1"

svc.Labels = defaultLabels(crd,
kube.InstanceLabel, instanceName(crd, ordinal),
kube.ComponentLabel, "cosmos-sentry",
)
svc.Annotations = map[string]string{}

svc.Spec.Ports = []corev1.ServicePort{
{
Name: "sentry-privval",
Protocol: corev1.ProtocolTCP,
Port: privvalPort,
TargetPort: intstr.FromString("privval"),
},
}
svc.Spec.Selector = map[string]string{kube.InstanceLabel: instanceName(crd, ordinal)}

preserveMergeInto(svc.Labels, crd.Spec.Service.P2PTemplate.Metadata.Labels)
preserveMergeInto(svc.Annotations, crd.Spec.Service.P2PTemplate.Metadata.Annotations)
svc.Spec.Type = corev1.ServiceTypeClusterIP
svc.Spec.PublishNotReadyAddresses = true

svcs = append(svcs, diff.Adapt(&svc, len(svcs)))
}
}
rpc := rpcService(crd)
return append(p2ps, diff.Adapt(rpc, len(p2ps)))

// Add RPC service
svcs = append(svcs, diff.Adapt(rpcService(crd), len(svcs)))

return svcs
}

func rpcService(crd *cosmosv1.CosmosFullNode) *corev1.Service {
Expand Down Expand Up @@ -131,6 +180,10 @@ func p2pServiceName(crd *cosmosv1.CosmosFullNode, ordinal int32) string {
return fmt.Sprintf("%s-p2p-%d", appName(crd), ordinal)
}

func sentryServiceName(crd *cosmosv1.CosmosFullNode, ordinal int32) string {
return fmt.Sprintf("%s-privval-%d", appName(crd), ordinal)
}

func rpcServiceName(crd *cosmosv1.CosmosFullNode) string {
return fmt.Sprintf("%s-rpc", appName(crd))
}
139 changes: 91 additions & 48 deletions internal/fullnode/service_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func TestBuildServices(t *testing.T) {
t.Parallel()

t.Run("p2p services", func(t *testing.T) {
t.Run("regular node services", func(t *testing.T) {
crd := defaultCRD()
crd.Spec.Replicas = 3
crd.Name = "terra"
Expand Down Expand Up @@ -68,7 +68,96 @@ func TestBuildServices(t *testing.T) {
}
})

t.Run("p2p services", func(t *testing.T) {
t.Run("sentry node services", func(t *testing.T) {
crd := defaultCRD()
crd.Spec.Replicas = 2
crd.Name = "terra"
crd.Namespace = "test"
crd.Spec.ChainSpec.Network = "testnet"
crd.Spec.PodTemplate.Image = "terra:v6.0.0"
crd.Spec.Type = cosmosv1.Sentry // Set sentry type

svcs := BuildServices(&crd)

require.Equal(t, 5, len(svcs)) // 2 p2p + 2 privval + 1 rpc service

// Test P2P services (first 2)
for i, svc := range svcs[:2] {
p2p := svc.Object()
require.Equal(t, fmt.Sprintf("terra-p2p-%d", i), p2p.Name)
require.Equal(t, "p2p", p2p.Labels["app.kubernetes.io/component"])

wantP2PSpec := corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "p2p",
Protocol: corev1.ProtocolTCP,
Port: 26656,
TargetPort: intstr.FromString("p2p"),
},
},
Selector: map[string]string{"app.kubernetes.io/instance": fmt.Sprintf("terra-%d", i)},
Type: corev1.ServiceTypeClusterIP,
}
if i == 0 {
wantP2PSpec.Type = corev1.ServiceTypeLoadBalancer
wantP2PSpec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal
}
require.Equal(t, wantP2PSpec, p2p.Spec)
}

// Test Privval services (next 2)
for i, svc := range svcs[2:4] {
privval := svc.Object()
require.Equal(t, fmt.Sprintf("terra-privval-%d", i), privval.Name)
require.Equal(t, "cosmos-sentry", privval.Labels["app.kubernetes.io/component"])

wantPrivvalSpec := corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "sentry-privval",
Protocol: corev1.ProtocolTCP,
Port: privvalPort,
TargetPort: intstr.FromString("privval"),
},
},
Selector: map[string]string{"app.kubernetes.io/instance": fmt.Sprintf("terra-%d", i)},
Type: corev1.ServiceTypeClusterIP,
PublishNotReadyAddresses: true,
}
require.Equal(t, wantPrivvalSpec, privval.Spec)
}

// Test RPC service (last one)
rpc := svcs[4].Object()
require.Equal(t, "terra-rpc", rpc.Name)
require.Equal(t, "rpc", rpc.Labels["app.kubernetes.io/component"])
})

t.Run("sentry service with overrides", func(t *testing.T) {
crd := defaultCRD()
crd.Spec.Replicas = 1
crd.Name = "terra"
crd.Spec.Type = cosmosv1.Sentry
crd.Spec.Service.P2PTemplate = cosmosv1.ServiceOverridesSpec{
Metadata: cosmosv1.Metadata{
Labels: map[string]string{"test": "value1"},
Annotations: map[string]string{"test": "value2"},
},
}

svcs := BuildServices(&crd)
require.Equal(t, 3, len(svcs)) // 1 p2p + 1 privval + 1 rpc

// Check privval service has overrides applied
privval := svcs[1].Object()
require.Equal(t, "terra-privval-0", privval.Name)
require.Equal(t, "value1", privval.Labels["test"])
require.Equal(t, "value2", privval.Annotations["test"])
require.True(t, privval.Spec.PublishNotReadyAddresses)
})

t.Run("p2p services with custom start ordinal", func(t *testing.T) {
crd := defaultCRD()
crd.Spec.Replicas = 3
crd.Name = "terra"
Expand Down Expand Up @@ -171,51 +260,6 @@ func TestBuildServices(t *testing.T) {
}
})

t.Run("p2p services with overrides", func(t *testing.T) {
crd := defaultCRD()
crd.Spec.Replicas = 2
crd.Name = "terra"
crd.Spec.Service.MaxP2PExternalAddresses = ptr(int32(2))
crd.Spec.Service.P2PTemplate = cosmosv1.ServiceOverridesSpec{
Metadata: cosmosv1.Metadata{
Labels: map[string]string{"test": "value1", "app.kubernetes.io/name": "should not see me"},
Annotations: map[string]string{"test": "value2", "app.kubernetes.io/ordinal": "should not see me"},
},
Type: ptr(corev1.ServiceTypeNodePort),
ExternalTrafficPolicy: ptr(corev1.ServiceExternalTrafficPolicyTypeLocal),
}
svcs := BuildServices(&crd)

require.Equal(t, 3, len(svcs)) // 2 p2p services + 1 rpc service

for i, svc := range svcs[:2] {
p2p := svc.Object()
require.Equal(t, fmt.Sprintf("terra-p2p-%d", i), p2p.Name)

require.Equal(t, "value1", p2p.Labels["test"])
require.NotEqual(t, "should not see me", p2p.Labels["app.kubernetes.io/name"])

require.Equal(t, "value2", p2p.Annotations["test"])
require.NotEqual(t, "should not see me", p2p.Labels["app.kubernetes.io/ordinal"])

wantSpec := corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "p2p",
Protocol: corev1.ProtocolTCP,
Port: 26656,
TargetPort: intstr.FromString("p2p"),
},
},
Selector: map[string]string{"app.kubernetes.io/instance": fmt.Sprintf("terra-%d", i)},
Type: corev1.ServiceTypeNodePort,
ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyTypeLocal,
}

require.Equal(t, wantSpec, p2p.Spec)
}
})

t.Run("rpc service", func(t *testing.T) {
crd := defaultCRD()
crd.Spec.Replicas = 1
Expand Down Expand Up @@ -244,7 +288,6 @@ func TestBuildServices(t *testing.T) {
require.Equal(t, wantLabels, rpc.Labels)

require.Equal(t, 5, len(rpc.Spec.Ports))
// All ports minus prometheus and p2p.
want := []corev1.ServicePort{
{
Name: "api",
Expand Down

0 comments on commit e17bee9

Please sign in to comment.