From e4c12de1a12cd7caf97ece90cd7f02a110e7cf91 Mon Sep 17 00:00:00 2001 From: Krishnakant C Date: Fri, 6 Sep 2024 18:03:06 +0530 Subject: [PATCH] #3 updated partition info with other util methods and updated unit tests --- internal/client/common/partition_info.go | 81 +++++++++++++++++ internal/client/common/partition_info_test.go | 86 ++++++++++++++++++- 2 files changed, 166 insertions(+), 1 deletion(-) diff --git a/internal/client/common/partition_info.go b/internal/client/common/partition_info.go index 5ddc3a1..36d0964 100644 --- a/internal/client/common/partition_info.go +++ b/internal/client/common/partition_info.go @@ -2,6 +2,7 @@ package common import ( "fmt" + "hash/fnv" "strings" ) @@ -17,6 +18,86 @@ type PartitionInfo struct { OfflineReplicas []*Node // The subset of replicas that are offline } +// NewPartitionInfo creates a new PartitionInfo instance. +func NewPartitionInfo(topic string, partition int, leader *Node, replicas, inSyncReplicas []*Node) *PartitionInfo { + return &PartitionInfo{ + Topic: topic, + Partition: partition, + Leader: leader, + Replicas: replicas, + InSyncReplicas: inSyncReplicas, + OfflineReplicas: []*Node{}, // Default to empty slice + } +} + +// NewPartitionInfoWithOffline creates a new PartitionInfo instance with offline replicas. +func NewPartitionInfoWithOffline(topic string, partition int, leader *Node, replicas, inSyncReplicas, offlineReplicas []*Node) *PartitionInfo { + return &PartitionInfo{ + Topic: topic, + Partition: partition, + Leader: leader, + Replicas: replicas, + InSyncReplicas: inSyncReplicas, + OfflineReplicas: offlineReplicas, + } +} + +// Topic returns the topic name. +func (p *PartitionInfo) GetTopic() string { + return p.Topic +} + +// Partition returns the partition ID. +func (p *PartitionInfo) GetPartition() int { + return p.Partition +} + +// Leader returns the node currently acting as a leader for this partition or nil if there is no leader. +func (p *PartitionInfo) GetLeader() *Node { + return p.Leader +} + +// Replicas returns the complete set of replicas for this partition regardless of whether they are alive or up-to-date. +func (p *PartitionInfo) GetReplicas() []*Node { + return p.Replicas +} + +// InSyncReplicas returns the subset of the replicas that are in sync, that is caught-up to the leader. +func (p *PartitionInfo) GetInSyncReplicas() []*Node { + return p.InSyncReplicas +} + +// OfflineReplicas returns the subset of the replicas that are offline. +func (p *PartitionInfo) GetOfflineReplicas() []*Node { + return p.OfflineReplicas +} + +// Hash generates a hash code for the PartitionInfo. +func (p *PartitionInfo) Hash() uint32 { + h := fnv.New32a() + h.Write([]byte(p.Topic)) + h.Write([]byte(fmt.Sprintf("%d", p.Partition))) + if p.Leader != nil { + h.Write([]byte(fmt.Sprintf("%d", p.Leader.ID))) + } + for _, replica := range p.Replicas { + if replica != nil { + h.Write([]byte(fmt.Sprintf("%d", replica.ID))) + } + } + for _, inSyncReplica := range p.InSyncReplicas { + if inSyncReplica != nil { + h.Write([]byte(fmt.Sprintf("%d", inSyncReplica.ID))) + } + } + for _, offlineReplica := range p.OfflineReplicas { + if offlineReplica != nil { + h.Write([]byte(fmt.Sprintf("%d", offlineReplica.ID))) + } + } + return h.Sum32() +} + // FormatNodeIDs formats the node IDs from a slice of Node pointers for display. // This method creates a string representation of node IDs, enclosed in square brackets, // and separates each ID by a comma. diff --git a/internal/client/common/partition_info_test.go b/internal/client/common/partition_info_test.go index 281d79f..bbd405a 100644 --- a/internal/client/common/partition_info_test.go +++ b/internal/client/common/partition_info_test.go @@ -1,6 +1,9 @@ package common -import "testing" +import ( + "reflect" + "testing" +) // TestFormatNodeIDs tests the FormatNodeIDs function. func TestFormatNodeIDs(t *testing.T) { @@ -122,3 +125,84 @@ func TestSlicesEqual(t *testing.T) { }) } } + +func TestNewPartitionInfo(t *testing.T) { + leaderNode := &Node{ID: 1} + replicaNodes := []*Node{{ID: 2}, {ID: 3}} + inSyncNodes := []*Node{{ID: 2}} + + partitionInfo := NewPartitionInfo("example-topic", 0, leaderNode, replicaNodes, inSyncNodes) + + if partitionInfo.GetTopic() != "example-topic" { + t.Errorf("Expected topic to be 'example-topic', got %s", partitionInfo.GetTopic()) + } + + if partitionInfo.GetPartition() != 0 { + t.Errorf("Expected partition to be 0, got %d", partitionInfo.GetPartition()) + } + + if partitionInfo.GetLeader() != leaderNode { + t.Errorf("Expected leader to be %v, got %v", leaderNode, partitionInfo.GetLeader()) + } + + if !reflect.DeepEqual(partitionInfo.GetReplicas(), replicaNodes) { + t.Errorf("Expected replicas to be %v, got %v", replicaNodes, partitionInfo.GetReplicas()) + } + + if !reflect.DeepEqual(partitionInfo.GetInSyncReplicas(), inSyncNodes) { + t.Errorf("Expected in-sync replicas to be %v, got %v", inSyncNodes, partitionInfo.GetInSyncReplicas()) + } + + if len(partitionInfo.GetOfflineReplicas()) != 0 { + t.Errorf("Expected offline replicas to be empty, got %v", partitionInfo.GetOfflineReplicas()) + } +} + +func TestNewPartitionInfoWithOffline(t *testing.T) { + leaderNode := &Node{ID: 1} + replicaNodes := []*Node{{ID: 2}, {ID: 3}} + inSyncNodes := []*Node{{ID: 2}} + offlineNodes := []*Node{{ID: 3}} + + partitionInfo := NewPartitionInfoWithOffline("example-topic", 0, leaderNode, replicaNodes, inSyncNodes, offlineNodes) + + if partitionInfo.GetTopic() != "example-topic" { + t.Errorf("Expected topic to be 'example-topic', got %s", partitionInfo.GetTopic()) + } + + if partitionInfo.GetPartition() != 0 { + t.Errorf("Expected partition to be 0, got %d", partitionInfo.GetPartition()) + } + + if partitionInfo.GetLeader() != leaderNode { + t.Errorf("Expected leader to be %v, got %v", leaderNode, partitionInfo.GetLeader()) + } + + if !reflect.DeepEqual(partitionInfo.GetReplicas(), replicaNodes) { + t.Errorf("Expected replicas to be %v, got %v", replicaNodes, partitionInfo.GetReplicas()) + } + + if !reflect.DeepEqual(partitionInfo.GetInSyncReplicas(), inSyncNodes) { + t.Errorf("Expected in-sync replicas to be %v, got %v", inSyncNodes, partitionInfo.GetInSyncReplicas()) + } + + if !reflect.DeepEqual(partitionInfo.GetOfflineReplicas(), offlineNodes) { + t.Errorf("Expected offline replicas to be %v, got %v", offlineNodes, partitionInfo.GetOfflineReplicas()) + } +} + +func TestHash(t *testing.T) { + leaderNode := &Node{ID: 1} + replicaNodes := []*Node{{ID: 2}, {ID: 3}} + inSyncNodes := []*Node{{ID: 2}} + + partitionInfo := NewPartitionInfo("example-topic", 0, leaderNode, replicaNodes, inSyncNodes) + hash1 := partitionInfo.Hash() + + partitionInfo = NewPartitionInfo("example-topic", 0, leaderNode, replicaNodes, inSyncNodes) + hash2 := partitionInfo.Hash() + + if hash1 != hash2 { + t.Errorf("Expected hashes to be equal, got %d and %d", hash1, hash2) + } +}