Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

produce messages partitioned by value #115

Merged
merged 4 commits into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
/kt
/quickfix
.idea/*
.vscode/*
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ module github.com/fgeller/kt
require (
github.com/Shopify/sarama v1.26.1
github.com/davecgh/go-spew v1.1.1
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72
)
Expand Down
26 changes: 21 additions & 5 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (cmd *produceCmd) read(as []string) produceArgs {
flags.BoolVar(&args.literal, "literal", false, "Interpret stdin line literally and pass it as value, key as null.")
flags.StringVar(&args.version, "version", "", "Kafka protocol version")
flags.StringVar(&args.compression, "compression", "", "Kafka message compression codec [gzip|snappy|lz4] (defaults to none)")
flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode")
flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode, hashCodeByValue")
flags.StringVar(&args.decodeKey, "decodekey", "string", "Decode message value as (string|hex|base64), defaults to string.")
flags.StringVar(&args.decodeValue, "decodevalue", "string", "Decode message value as (string|hex|base64), defaults to string.")
flags.IntVar(&args.bufferSize, "buffersize", 16777216, "Buffer size for scanning stdin, defaults to 16777216=16*1024*1024.")
Expand Down Expand Up @@ -320,11 +320,16 @@ func (cmd *produceCmd) deserializeLines(in chan string, out chan message, partit
}

var part int32 = 0
if msg.Key != nil && cmd.partitioner == "hashCode" {
part = hashCodePartition(*msg.Key, partitionCount)
}
if msg.Partition == nil {
if msg.Value != nil && cmd.partitioner == "hashCodeByValue" {
part = hashCodePartition(*msg.Value, partitionCount)
msg.Partition = &part
}else {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please run gofmt or your alternative on this? 🙏

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and would it make sense to group the guards slightly differently?

if msg.Partition == nil {
	if msg.Value != nil && cmd.partitioner == "hashCodeByValue" {
		part = hashCodePartition(*msg.Value, partitionCount)
		msg.Partition = &part
	} else if msg.Key != nil && cmd.partitioner == "hashCode" {
		part = hashCodePartition(*msg.Key, partitionCount)
		msg.Partition = &part
	}
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will do it

if msg.Key != nil && cmd.partitioner == "hashCode" {
part = hashCodePartition(*msg.Key, partitionCount)
}
if msg.Partition == nil {
msg.Partition = &part
}
}

out <- msg
Expand Down Expand Up @@ -519,6 +524,12 @@ like the following:
In case the input line cannot be interpeted as a JSON object the key and value
both default to the input line and partition to 0.

If you don't want to specify key for single message, in other words, it doesn't matter that a message goes
to a random paritition (with equal probability), you can set the flag '-partitioner' with 'hashCodeByValue'.
That will tell kt to take the value of a message to calculate a hashcode deciding which paritition it will go to.
This can be helpful when you just want there are many messages distributed in partitions of a topic, and don't
care about what the content is.

Examples:

Send a single message with a specific key:
Expand All @@ -529,6 +540,11 @@ Send a single message with a specific key:
$ kt consume -topic greetings -timeout 1s -offsets 0:3-
{"partition":0,"offset":3,"key":"id-23","message":"ola"}

Send a single message without specified key:
$ echo 'no key specified message' | kt produce -topic greetings -partitioner hashCodeByValue
Sent message to a partition decided by your case


Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome! thanks for adding an example and doc in general!

Keep reading input from stdin until interrupted (via ^C).

$ kt produce -topic greetings
Expand Down