Skip to content

Commit

Permalink
#3 updated partition info with other util methods and updated unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Krishnakant C authored and Krishnakant C committed Sep 6, 2024
1 parent febbe6b commit e4c12de
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 1 deletion.
81 changes: 81 additions & 0 deletions internal/client/common/partition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"fmt"
"hash/fnv"
"strings"
)

Expand All @@ -17,6 +18,86 @@ type PartitionInfo struct {
OfflineReplicas []*Node // The subset of replicas that are offline
}

// NewPartitionInfo creates a new PartitionInfo instance.
func NewPartitionInfo(topic string, partition int, leader *Node, replicas, inSyncReplicas []*Node) *PartitionInfo {
return &PartitionInfo{
Topic: topic,
Partition: partition,
Leader: leader,
Replicas: replicas,
InSyncReplicas: inSyncReplicas,
OfflineReplicas: []*Node{}, // Default to empty slice
}
}

// NewPartitionInfoWithOffline creates a new PartitionInfo instance with offline replicas.
func NewPartitionInfoWithOffline(topic string, partition int, leader *Node, replicas, inSyncReplicas, offlineReplicas []*Node) *PartitionInfo {
return &PartitionInfo{
Topic: topic,
Partition: partition,
Leader: leader,
Replicas: replicas,
InSyncReplicas: inSyncReplicas,
OfflineReplicas: offlineReplicas,
}
}

// Topic returns the topic name.
func (p *PartitionInfo) GetTopic() string {
return p.Topic
}

// Partition returns the partition ID.
func (p *PartitionInfo) GetPartition() int {
return p.Partition
}

// Leader returns the node currently acting as a leader for this partition or nil if there is no leader.
func (p *PartitionInfo) GetLeader() *Node {
return p.Leader
}

// Replicas returns the complete set of replicas for this partition regardless of whether they are alive or up-to-date.
func (p *PartitionInfo) GetReplicas() []*Node {
return p.Replicas
}

// InSyncReplicas returns the subset of the replicas that are in sync, that is caught-up to the leader.
func (p *PartitionInfo) GetInSyncReplicas() []*Node {
return p.InSyncReplicas
}

// OfflineReplicas returns the subset of the replicas that are offline.
func (p *PartitionInfo) GetOfflineReplicas() []*Node {
return p.OfflineReplicas
}

// Hash generates a hash code for the PartitionInfo.
func (p *PartitionInfo) Hash() uint32 {
h := fnv.New32a()
h.Write([]byte(p.Topic))
h.Write([]byte(fmt.Sprintf("%d", p.Partition)))
if p.Leader != nil {
h.Write([]byte(fmt.Sprintf("%d", p.Leader.ID)))
}
for _, replica := range p.Replicas {
if replica != nil {
h.Write([]byte(fmt.Sprintf("%d", replica.ID)))
}
}
for _, inSyncReplica := range p.InSyncReplicas {
if inSyncReplica != nil {
h.Write([]byte(fmt.Sprintf("%d", inSyncReplica.ID)))
}
}
for _, offlineReplica := range p.OfflineReplicas {
if offlineReplica != nil {
h.Write([]byte(fmt.Sprintf("%d", offlineReplica.ID)))
}
}
return h.Sum32()
}

// FormatNodeIDs formats the node IDs from a slice of Node pointers for display.
// This method creates a string representation of node IDs, enclosed in square brackets,
// and separates each ID by a comma.
Expand Down
86 changes: 85 additions & 1 deletion internal/client/common/partition_info_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package common

import "testing"
import (
"reflect"
"testing"
)

// TestFormatNodeIDs tests the FormatNodeIDs function.
func TestFormatNodeIDs(t *testing.T) {
Expand Down Expand Up @@ -122,3 +125,84 @@ func TestSlicesEqual(t *testing.T) {
})
}
}

func TestNewPartitionInfo(t *testing.T) {
leaderNode := &Node{ID: 1}
replicaNodes := []*Node{{ID: 2}, {ID: 3}}
inSyncNodes := []*Node{{ID: 2}}

partitionInfo := NewPartitionInfo("example-topic", 0, leaderNode, replicaNodes, inSyncNodes)

if partitionInfo.GetTopic() != "example-topic" {
t.Errorf("Expected topic to be 'example-topic', got %s", partitionInfo.GetTopic())
}

if partitionInfo.GetPartition() != 0 {
t.Errorf("Expected partition to be 0, got %d", partitionInfo.GetPartition())
}

if partitionInfo.GetLeader() != leaderNode {
t.Errorf("Expected leader to be %v, got %v", leaderNode, partitionInfo.GetLeader())
}

if !reflect.DeepEqual(partitionInfo.GetReplicas(), replicaNodes) {
t.Errorf("Expected replicas to be %v, got %v", replicaNodes, partitionInfo.GetReplicas())
}

if !reflect.DeepEqual(partitionInfo.GetInSyncReplicas(), inSyncNodes) {
t.Errorf("Expected in-sync replicas to be %v, got %v", inSyncNodes, partitionInfo.GetInSyncReplicas())
}

if len(partitionInfo.GetOfflineReplicas()) != 0 {
t.Errorf("Expected offline replicas to be empty, got %v", partitionInfo.GetOfflineReplicas())
}
}

func TestNewPartitionInfoWithOffline(t *testing.T) {
leaderNode := &Node{ID: 1}
replicaNodes := []*Node{{ID: 2}, {ID: 3}}
inSyncNodes := []*Node{{ID: 2}}
offlineNodes := []*Node{{ID: 3}}

partitionInfo := NewPartitionInfoWithOffline("example-topic", 0, leaderNode, replicaNodes, inSyncNodes, offlineNodes)

if partitionInfo.GetTopic() != "example-topic" {
t.Errorf("Expected topic to be 'example-topic', got %s", partitionInfo.GetTopic())
}

if partitionInfo.GetPartition() != 0 {
t.Errorf("Expected partition to be 0, got %d", partitionInfo.GetPartition())
}

if partitionInfo.GetLeader() != leaderNode {
t.Errorf("Expected leader to be %v, got %v", leaderNode, partitionInfo.GetLeader())
}

if !reflect.DeepEqual(partitionInfo.GetReplicas(), replicaNodes) {
t.Errorf("Expected replicas to be %v, got %v", replicaNodes, partitionInfo.GetReplicas())
}

if !reflect.DeepEqual(partitionInfo.GetInSyncReplicas(), inSyncNodes) {
t.Errorf("Expected in-sync replicas to be %v, got %v", inSyncNodes, partitionInfo.GetInSyncReplicas())
}

if !reflect.DeepEqual(partitionInfo.GetOfflineReplicas(), offlineNodes) {
t.Errorf("Expected offline replicas to be %v, got %v", offlineNodes, partitionInfo.GetOfflineReplicas())
}
}

func TestHash(t *testing.T) {
leaderNode := &Node{ID: 1}
replicaNodes := []*Node{{ID: 2}, {ID: 3}}
inSyncNodes := []*Node{{ID: 2}}

partitionInfo := NewPartitionInfo("example-topic", 0, leaderNode, replicaNodes, inSyncNodes)
hash1 := partitionInfo.Hash()

partitionInfo = NewPartitionInfo("example-topic", 0, leaderNode, replicaNodes, inSyncNodes)
hash2 := partitionInfo.Hash()

if hash1 != hash2 {
t.Errorf("Expected hashes to be equal, got %d and %d", hash1, hash2)
}
}

0 comments on commit e4c12de

Please sign in to comment.