diff --git a/produce.go b/produce.go index dcaf4e7..46c0ec8 100644 --- a/produce.go +++ b/produce.go @@ -448,3 +448,22 @@ func hashCode(s string) (hc int32) { } return } + +func kafkaAbs(i int32) int32 { + switch { + case i == -2147483648: // Integer.MIN_VALUE + return 0 + case i < 0: + return i * -1 + default: + return i + } +} + +func hashCodePartition(key string, partitions int32) int32 { + if partitions <= 0 { + return -1 + } + + return kafkaAbs(hashCode(key)) % partitions +} diff --git a/produce_test.go b/produce_test.go index c30303c..43b1ebc 100644 --- a/produce_test.go +++ b/produce_test.go @@ -54,6 +54,68 @@ func TestHashCode(t *testing.T) { } } +func TestHashCodePartition(t *testing.T) { + + data := []struct { + key string + partitions int32 + expected int32 + }{ + { + key: "", + partitions: 0, + expected: -1, + }, + { + key: "", + partitions: 1, + expected: 0, + }, + { + key: "super-duper-key", + partitions: 1, + expected: 0, + }, + { + key: "", + partitions: 1, + expected: 0, + }, + { + key: "", + partitions: 2, + expected: 0, + }, + { + key: "a", + partitions: 2, + expected: 1, + }, + { + key: "b", + partitions: 2, + expected: 0, + }, + { + key: "random", + partitions: 2, + expected: 1, + }, + { + key: "random", + partitions: 5, + expected: 0, + }, + } + + for _, d := range data { + actual := hashCodePartition(d.key, d.partitions) + if actual != d.expected { + t.Errorf("expected %v but found %v for key %#v and %v partitions\n", d.expected, actual, d.key, d.partitions) + } + } +} + func TestProduceParseArgs(t *testing.T) { configBefore := config defer func() {