From 2fefab16b432116991999061d554647e763d2482 Mon Sep 17 00:00:00 2001 From: d-rk Date: Thu, 18 Jan 2024 11:07:17 +0000 Subject: [PATCH] =?UTF-8?q?Deploying=20to=20gh-pages=20from=20=20@=2071c54?= =?UTF-8?q?82845fcbed99a87405fad6af0399eb9ad3e=20=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 14 +- README.md => README.adoc | 586 +++++++++++++++----------- cmd/consume/consume.go | 1 + cmd/create/create-topic.go | 2 +- cmd/deletion/delete-records.go | 36 ++ cmd/deletion/delete-records_test.go | 77 ++++ cmd/deletion/delete.go | 3 +- cmd/describe/describe-topic.go | 2 +- cmd/describe/describe-topic_test.go | 68 +++ docs/kafkactl_docs.md | 51 ++- go.mod | 41 +- go.sum | 48 ++- internal/broker/broker-operation.go | 4 +- internal/common-operation.go | 19 +- internal/common-operation_test.go | 83 ++++ internal/consume/GroupConsumer.go | 9 +- internal/consume/PartitionConsumer.go | 30 +- internal/consume/consume-operation.go | 44 +- internal/topic/topic-operation.go | 66 ++- util/parse_offsets.go | 65 +++ util/parse_offsets_test.go | 66 +++ 21 files changed, 977 insertions(+), 338 deletions(-) rename README.md => README.adoc (73%) create mode 100644 cmd/deletion/delete-records.go create mode 100644 cmd/deletion/delete-records_test.go create mode 100644 internal/common-operation_test.go create mode 100644 util/parse_offsets.go create mode 100644 util/parse_offsets_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e739acb..70bc80e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## 4.0.0 - 2024-01-18 + +### Added +- [#182](https://github.com/deviceinsight/kafkactl/issues/182) Make isolationLevel configurable and change default to 'readCommitted' +- [#184](https://github.com/deviceinsight/kafkactl/pull/184) Added option to show default configs when describing topics +- [#183](https://github.com/deviceinsight/kafkactl/issues/183) Add command `delete records` to delete records from topic + +### Changed +- [#180](https://github.com/deviceinsight/kafkactl/issues/180) Change default for replication-factor when creating topics + ## 3.5.1 - 2023-11-10 ## 3.5.0 - 2023-11-10 @@ -258,7 +268,7 @@ of `plaintext`. ## 1.11.0 - 2020-08-07 ### Added -- direct support for kafka clusters [running in kubernetes](https://github.com/deviceinsight/kafkactl/blob/main/README.md#running-in-kubernetes) +- direct support for kafka clusters [running in kubernetes](https://github.com/deviceinsight/kafkactl/blob/main/README.adoc#running-in-kubernetes) - `attach` command to get a bash into kafkactl pod when running in kubernetes ## 1.10.0 - 2020-08-03 @@ -348,7 +358,7 @@ of `plaintext`. ## 1.2.0 - 2019-05-24 ### Added -- Additional config file locations added. See README.md for details +- Additional config file locations added. See README.adoc for details - Added `offset` parameter to `consume` - Support for basic auto completion in fish shell - Add 'config view` command, to view current config diff --git a/README.md b/README.adoc similarity index 73% rename from README.md rename to README.adoc index bc94437..c46f065 100644 --- a/README.md +++ b/README.adoc @@ -1,77 +1,85 @@ -# kafkactl +:toc: +:toclevels: 2 + += kafkactl A command-line interface for interaction with Apache Kafka -[![Build Status](https://github.com/deviceinsight/kafkactl/workflows/Lint%20%2F%20Test%20%2F%20IT/badge.svg?branch=main)](https://github.com/deviceinsight/kafkactl/actions) -| [![command docs](https://img.shields.io/badge/command-docs-blue.svg)](https://deviceinsight.github.io/kafkactl/) +image:https://github.com/deviceinsight/kafkactl/workflows/Lint%20%2F%20Test%20%2F%20IT/badge.svg?branch=main[Build Status,link=https://github.com/deviceinsight/kafkactl/actions] +| image:https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/] -## Features +== Features -- command auto-completion for bash, zsh, fish shell including dynamic completion for e.g. topics or consumer groups. -- support for avro schemas -- Configuration of different contexts -- directly access kafka clusters inside your kubernetes cluster -- support for consuming and producing protobuf-encoded messages +* command auto-completion for bash, zsh, fish shell including dynamic completion for e.g. topics or consumer groups. +* support for avro schemas +* Configuration of different contexts +* directly access kafka clusters inside your kubernetes cluster +* support for consuming and producing protobuf-encoded messages -[![asciicast](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg)](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr) +image::https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg[asciicast,link=https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr] -## Installation +== Installation You can install the pre-compiled binary or compile from source. -### Install the pre-compiled binary +=== Install the pre-compiled binary -**snap**: +*snap*: -```bash +[,bash] +---- snap install kafkactl -``` +---- -**homebrew**: +*homebrew*: -```bash +[,bash] +---- # install tap repostory once brew tap deviceinsight/packages # install kafkactl brew install deviceinsight/packages/kafkactl # upgrade kafkactl brew upgrade deviceinsight/packages/kafkactl -``` +---- -**deb/rpm**: +*deb/rpm*: -Download the .deb or .rpm from the [releases page](https://github.com/deviceinsight/kafkactl/releases) and install with dpkg -i and rpm -i respectively. +Download the .deb or .rpm from the https://github.com/deviceinsight/kafkactl/releases[releases page] and install with dpkg -i and rpm -i respectively. -**yay (AUR)** +*yay (AUR)* -There's a kafkactl [AUR package](https://aur.archlinux.org/packages/kafkactl/) available for Arch. Install it with your AUR helper of choice (e.g. [yay](https://github.com/Jguer/yay)): +There's a kafkactl https://aur.archlinux.org/packages/kafkactl/[AUR package] available for Arch. Install it with your AUR helper of choice (e.g. https://github.com/Jguer/yay[yay]): -```bash +[,bash] +---- yay -S kafkactl -``` +---- -**manually**: +*manually*: -Download the pre-compiled binaries from the [releases page](https://github.com/deviceinsight/kafkactl/releases) and copy to the desired location. +Download the pre-compiled binaries from the https://github.com/deviceinsight/kafkactl/releases[releases page] and copy to the desired location. -### Compiling from source +=== Compiling from source -```bash +[,bash] +---- go get -u github.com/deviceinsight/kafkactl -``` +---- -**NOTE:** make sure that `kafkactl` is on PATH otherwise auto-completion won't work. +*NOTE:* make sure that `kafkactl` is on PATH otherwise auto-completion won't work. -## Configuration +== Configuration If no config file is found, a default config is generated in `$HOME/.config/kafkactl/config.yml`. This configuration is suitable to get started with a single node cluster on a local machine. -### Create a config file +=== Create a config file Create `$HOME/.config/kafkactl/config.yml` with a definition of contexts that should be available -```yaml +[,yaml] +---- contexts: default: brokers: @@ -151,113 +159,110 @@ contexts: # optional: maximum permitted size of a message (defaults to 1000000) maxMessageBytes: 1000000 + consumer: + # optional: isolationLevel (defaults to ReadCommitted) + isolationLevel: ReadUncommitted + current-context: default -``` +---- The config file location is resolved by -1. checking for a provided commandline argument: `--config-file=$PATH_TO_CONFIG` -2. evaluating the environment variable: `export KAFKA_CTL_CONFIG=$PATH_TO_CONFIG` -3. checking for a config file in the working directory i.e. `$PWD/kafkactl.yml` -4. as default the config file is looked up from one of the following locations: - - `$HOME/.config/kafkactl/config.yml` - - `$HOME/.kafkactl/config.yml` - - `$SNAP_REAL_HOME/.kafkactl/config.yml` - - `$SNAP_DATA/kafkactl/config.yml` - - `/etc/kafkactl/config.yml` +. checking for a provided commandline argument: `--config-file=$PATH_TO_CONFIG` +. evaluating the environment variable: `export KAFKA_CTL_CONFIG=$PATH_TO_CONFIG` +. checking for a config file in the working directory i.e. `$PWD/kafkactl.yml` +. as default the config file is looked up from one of the following locations: + ** `$HOME/.config/kafkactl/config.yml` + ** `$HOME/.kafkactl/config.yml` + ** `$SNAP_REAL_HOME/.kafkactl/config.yml` + ** `$SNAP_DATA/kafkactl/config.yml` + ** `/etc/kafkactl/config.yml` -### Auto completion +=== Auto completion -#### bash +==== bash -**NOTE:** if you installed via snap, bash completion should work automatically. +*NOTE:* if you installed via snap, bash completion should work automatically. -``` +---- source <(kafkactl completion bash) -``` +---- To load completions for each session, execute once: Linux: -``` +---- kafkactl completion bash > /etc/bash_completion.d/kafkactl -``` +---- MacOS: -``` +---- kafkactl completion bash > /usr/local/etc/bash_completion.d/kafkactl -``` +---- -#### zsh +==== zsh If shell completion is not already enabled in your environment, you will need to enable it. You can execute the following once: -``` +---- echo "autoload -U compinit; compinit" >> ~/.zshrc -``` +---- To load completions for each session, execute once: -``` +---- kafkactl completion zsh > "${fpath[1]}/_kafkactl" -``` +---- You will need to start a new shell for this setup to take effect. -#### Fish +==== Fish -``` +---- kafkactl completion fish | source -``` +---- To load completions for each session, execute once: -``` +---- kafkactl completion fish > ~/.config/fish/completions/kafkactl.fish -``` +---- + +== Documentation + +The documentation for all available commands can be found here: + +image::https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/] -## Running in docker +== Running in docker Assuming your Kafka brokers are accessible under `kafka1:9092` and `kafka2:9092`, you can list topics by running: -```bash +[,bash] +---- docker run --env BROKERS="kafka1:9092 kafka2:9092" deviceinsight/kafkactl:latest get topics -``` +---- If a more elaborate config is needed, you can mount it as a volume: -```bash +[,bash] +---- docker run -v /absolute/path/to/config.yml:/etc/kafkactl/config.yml deviceinsight/kafkactl get topics -``` - -## Configuration via environment variables - -Every key in the `config.yml` can be overwritten via environment variables. The corresponding environment variable -for a key can be found by applying the following rules: - -1. replace `.` by `_` -1. replace `-` by `_` -1. write the key name in ALL CAPS - -e.g. the key `contexts.default.tls.certKey` has the corresponding environment variable `CONTEXTS_DEFAULT_TLS_CERTKEY`. - -**NOTE:** an array variable can be written using whitespace as delimiter. For example `BROKERS` can be provided as -`BROKERS="broker1:9092 broker2:9092 broker3:9092"`. - -If environment variables for the `default` context should be set, the prefix `CONTEXTS_DEFAULT_` can be omitted. -So, instead of `CONTEXTS_DEFAULT_TLS_CERTKEY` one can also set `TLS_CERTKEY`. -See **root_test.go** for more examples. +---- -## Running in Kubernetes +== Running in Kubernetes -> :construction: This feature is still experimental. +____ +:construction: This feature is still experimental. +____ If your kafka cluster is not directly accessible from your machine, but it is accessible from a kubernetes cluster which in turn is accessible via `kubectl` from your machine you can configure kubernetes support: -```$yaml +[,$yaml] +---- contexts: kafka-cluster: brokers: @@ -268,7 +273,7 @@ contexts: binary: kubectl #optional kubeContext: k8s-cluster namespace: k8s-namespace -``` +---- Instead of directly talking to kafka brokers a kafkactl docker image is deployed as a pod into the kubernetes cluster, and the defined namespace. Standard-Input and Standard-Output are then wired between the pod and your shell @@ -276,81 +281,105 @@ running kafkactl. There are two options: -1. You can run `kafkactl attach` with your kubernetes cluster configured. This will use `kubectl run` to create a pod - in the configured kubeContext/namespace which runs an image of kafkactl and gives you a `bash` into the container. - Standard-in is piped to the pod and standard-out, standard-err directly to your shell. You even get auto-completion. - -2. You can run any other kafkactl command with your kubernetes cluster configured. Instead of directly - querying the cluster a pod is deployed, and input/output are wired between pod and your shell. +. You can run `kafkactl attach` with your kubernetes cluster configured. This will use `kubectl run` to create a pod +in the configured kubeContext/namespace which runs an image of kafkactl and gives you a `bash` into the container. +Standard-in is piped to the pod and standard-out, standard-err directly to your shell. You even get auto-completion. +. You can run any other kafkactl command with your kubernetes cluster configured. Instead of directly +querying the cluster a pod is deployed, and input/output are wired between pod and your shell. The names of the brokers have to match the service names used to access kafka in your cluster. A command like this should give you this information: -```bash +[,bash] +---- kubectl get svc | grep kafka -``` +---- -> :bulb: The first option takes a bit longer to start up since an Ubuntu based docker image is used in order to have -> a bash available. The second option uses a docker image build from scratch and should therefore be quicker. -> Which option is more suitable, will depend on your use-case. +____ +:bulb: The first option takes a bit longer to start up since an Ubuntu based docker image is used in order to have +a bash available. The second option uses a docker image build from scratch and should therefore be quicker. +Which option is more suitable, will depend on your use-case. +____ -> :warning: currently _kafkactl_ must **NOT** be installed via _snap_ in order for the kubernetes feature to work. The snap runs in a sandbox and is therefore unable to access the `kubectl` binary. +____ +:warning: currently _kafkactl_ must *NOT* be installed via _snap_ in order for the kubernetes feature to work. The snap runs in a sandbox and is therefore unable to access the `kubectl` binary. +____ -## Command documentation +== Configuration via environment variables -The documentation for all available commands can be found here: +Every key in the `config.yml` can be overwritten via environment variables. The corresponding environment variable +for a key can be found by applying the following rules: + +. replace `.` by `_` +. replace `-` by `_` +. write the key name in ALL CAPS + +e.g. the key `contexts.default.tls.certKey` has the corresponding environment variable `CONTEXTS_DEFAULT_TLS_CERTKEY`. + +*NOTE:* an array variable can be written using whitespace as delimiter. For example `BROKERS` can be provided as +`BROKERS="broker1:9092 broker2:9092 broker3:9092"`. -[![command docs](https://img.shields.io/badge/command-docs-blue.svg)](https://deviceinsight.github.io/kafkactl/) +If environment variables for the `default` context should be set, the prefix `CONTEXTS_DEFAULT_` can be omitted. +So, instead of `CONTEXTS_DEFAULT_TLS_CERTKEY` one can also set `TLS_CERTKEY`. +See *root_test.go* for more examples. -## Examples +== Examples -### Consuming messages +=== Consuming messages Consuming messages from a topic can be done with: -```bash +[,bash] +---- kafkactl consume my-topic -``` +---- In order to consume starting from the oldest offset use: -```bash +[,bash] +---- kafkactl consume my-topic --from-beginning -``` +---- The following example prints message `key` and `timestamp` as well as `partition` and `offset` in `yaml` format: -```bash +[,bash] +---- kafkactl consume my-topic --print-keys --print-timestamps -o yaml -``` +---- To print partition in default output format use: -```bash +[,bash] +---- kafkactl consume my-topic --print-partitions -``` +---- Headers of kafka messages can be printed with the parameter `--print-headers` e.g.: -```bash +[,bash] +---- kafkactl consume my-topic --print-headers -o yaml -``` +---- If one is only interested in the last `n` messages this can be achieved by `--tail` e.g.: -```bash +[,bash] +---- kafkactl consume my-topic --tail=5 -``` +---- The consumer can be stopped when the latest offset is reached using `--exit` parameter e.g.: -```bash +[,bash] +---- kafkactl consume my-topic --from-beginning --exit -``` +---- The consumer can compute the offset it starts from using a timestamp: -```bash +[,bash] +---- kafkactl consume my-topic --from-timestamp 1384216367189 kafkactl consume my-topic --from-timestamp 2014-04-26T17:24:37.123Z kafkactl consume my-topic --from-timestamp 2014-04-26T17:24:37.123 @@ -358,104 +387,117 @@ kafkactl consume my-topic --from-timestamp 2009-08-12T22:15:09Z kafkactl consume my-topic --from-timestamp 2017-07-19T03:21:51 kafkactl consume my-topic --from-timestamp 2013-04-01T22:43 kafkactl consume my-topic --from-timestamp 2014-04-26 -``` +---- The `from-timestamp` parameter supports different timestamp formats. It can either be a number representing the epoch milliseconds -or a string with a timestamp in one of the [supported date formats](https://github.com/deviceinsight/kafkactl/blob/main/util/util.go#L10). +or a string with a timestamp in one of the https://github.com/deviceinsight/kafkactl/blob/main/util/util.go#L10[supported date formats]. -**NOTE:** `--from-timestamp` is not designed to schedule the beginning of consumer's consumption. The offset corresponding to the timestamp is computed at the beginning of the process. So if you set it to a date in the future, the consumer will start from the latest offset. +*NOTE:* `--from-timestamp` is not designed to schedule the beginning of consumer's consumption. The offset corresponding to the timestamp is computed at the beginning of the process. So if you set it to a date in the future, the consumer will start from the latest offset. The consumer can be stopped when the offset corresponding to a particular timestamp is reached: -```bash +[,bash] +---- kafkactl consume my-topic --from-timestamp 2017-07-19T03:30:00 --to-timestamp 2017-07-19T04:30:00 -``` +---- The `to-timestamp` parameter supports the same formats as `from-timestamp`. -**NOTE:** `--to-timestamp` is not designed to schedule the end of consumer's consumption. The offset corresponding to the timestamp is computed at the begininng of the process. So if you set it to a date in the future, the consumer will stop at the current latest offset. +*NOTE:* `--to-timestamp` is not designed to schedule the end of consumer's consumption. The offset corresponding to the timestamp is computed at the begininng of the process. So if you set it to a date in the future, the consumer will stop at the current latest offset. The following example prints keys in hex and values in base64: -```bash +[,bash] +---- kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64 -``` +---- The consumer can convert protobuf messages to JSON in keys (optional) and values: -```bash +[,bash] +---- kafkactl consume my-topic --value-proto-type MyTopicValue --key-proto-type MyTopicKey --proto-file kafkamsg.proto -``` +---- To join a consumer group and consume messages as a member of the group: -```bash +[,bash] +---- kafkactl consume my-topic --group my-consumer-group -``` +---- If you want to limit the number of messages that will be read, specify `--max-messages`: -```bash +[,bash] +---- kafkactl consume my-topic --max-messages 2 -``` +---- -### Producing messages +=== Producing messages Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`, `value='my-value'` to the topic `my-topic` this can be achieved with one of the following commands: -```bash +[,bash] +---- echo "my-key#my-value" | kafkactl produce my-topic --separator=# echo "my-value" | kafkactl produce my-topic --key=my-key kafkactl produce my-topic --key=my-key --value=my-value -``` +---- If we have a file containing messages where each line contains `key` and `value` separated by `#`, the file can be used as input to produce messages to topic `my-topic`: -```bash +[,bash] +---- cat myfile | kafkactl produce my-topic --separator=# -``` +---- The same can be accomplished without piping the file to stdin with the `--file` parameter: -```bash +[,bash] +---- kafkactl produce my-topic --separator=# --file=myfile -``` +---- If the messages in the input file need to be split by a different delimiter than `\n` a custom line separator can be provided: -```bash +[,bash] +---- kafkactl produce my-topic --separator=# --lineSeparator=|| --file=myfile -``` +---- -**NOTE:** if the file was generated with `kafkactl consume --print-keys --print-timestamps my-topic` the produce +*NOTE:* if the file was generated with `kafkactl consume --print-keys --print-timestamps my-topic` the produce command is able to detect the message timestamp in the input and will ignore it. It is also possible to produce messages in json format: -```bash +[,bash] +---- # each line in myfile.json is expected to contain a json object with fields key, value kafkactl produce my-topic --file=myfile.json --input-format=json cat myfile.json | kafkactl produce my-topic --input-format=json -``` +---- the number of messages produced per second can be controlled with the `--rate` parameter: -```bash +[,bash] +---- cat myfile | kafkactl produce my-topic --separator=# --rate=200 -``` +---- It is also possible to specify the partition to insert the message: -```bash +[,bash] +---- kafkactl produce my-topic --key=my-key --value=my-value --partition=2 -``` +---- Additionally, a different partitioning scheme can be used. When a `key` is provided the default partitioner uses the `hash` of the `key` to assign a partition. So the same `key` will end up in the same partition: -```bash +[,bash] +---- # the following 3 messages will all be inserted to the same partition kafkactl produce my-topic --key=my-key --value=my-value kafkactl produce my-topic --key=my-key --value=my-value @@ -465,50 +507,56 @@ kafkactl produce my-topic --key=my-key --value=my-value kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random -``` +---- Message headers can also be written: -```bash +[,bash] +---- kafkactl produce my-topic --key=my-key --value=my-value --header key1:value1 --header key2:value\:2 -``` +---- The following example writes the key from base64 and value from hex: -```bash +[,bash] +---- kafkactl produce my-topic --key=dGVzdC1rZXk= --key-encoding=base64 --value=0000000000000000 --value-encoding=hex -``` +---- You can control how many replica acknowledgements are needed for a response: -```bash +[,bash] +---- kafkactl produce my-topic --key=my-key --value=my-value --required-acks=WaitForAll -``` +---- Producing null values (tombstone record) is also possible: -```bash +[,bash] +---- kafkactl produce my-topic --null-value -``` +---- Producing protobuf message converted from JSON: -```bash +[,bash] +---- kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto -``` +---- -### Avro support +=== Avro support In order to enable avro support you just have to add the schema registry to your configuration: -```$yaml +[,$yaml] +---- contexts: localhost: avro: schemaRegistry: localhost:8081 -``` +---- -#### Producing to an avro topic +==== Producing to an avro topic `kafkactl` will lookup the topic in the schema registry in order to determine if key or value needs to be avro encoded. If producing with the latest `schemaVersion` is sufficient, no additional configuration is needed an `kafkactl` handles @@ -516,9 +564,10 @@ this automatically. If however one needs to produce an older `schemaVersion` this can be achieved by providing the parameters `keySchemaVersion`, `valueSchemaVersion`. -##### Example +===== Example -```bash +[,bash] +---- # create a topic kafkactl create topic avro_topic # add a schema for the topic value @@ -529,9 +578,9 @@ http://localhost:8081/subjects/avro_topic-value/versions kafkactl produce avro_topic --value {\"next\":{\"LongList\":{}}} # consume the message kafkactl consume avro_topic --from-beginning --print-schema -o yaml -``` +---- -#### Consuming from an avro topic +==== Consuming from an avro topic As for producing `kafkactl` will also lookup the topic in the schema registry to determine if key or value needs to be decoded with an avro schema. @@ -540,18 +589,18 @@ The `consume` command handles this automatically and no configuration is needed. An additional parameter `print-schema` can be provided to display the schema used for decoding. -### Protobuf support +=== Protobuf support `kafkactl` can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization you should add flag `--value-proto-type` and optionally `--key-proto-type` (if keys encoded in protobuf format) -with type name. Protobuf-encoded messages are mapped with [pbjson](https://developers.google.com/protocol-buffers/docs/proto3#json). +with type name. Protobuf-encoded messages are mapped with https://developers.google.com/protocol-buffers/docs/proto3#json[pbjson]. `kafkactl` will search messages in following order: -1. Protoset files specified in `--protoset-file` flag -2. Protoset files specified in `context.protobuf.protosetFiles` config value -3. Proto files specified in `--proto-file` flag -4. Proto files specified in `context.protobuf.protoFiles` config value +. Protoset files specified in `--protoset-file` flag +. Protoset files specified in `context.protobuf.protosetFiles` config value +. Proto files specified in `--proto-file` flag +. Proto files specified in `context.protobuf.protoFiles` config value Proto files may require some dependencies in `import` sections. To specify additional lookup paths use `--proto-import-path` flag or `context.protobuf.importPaths` config value. @@ -562,15 +611,17 @@ Note that if you want to use raw proto files `protoc` installation don't need to Also note that protoset files must be compiled with included imports: -```bash +[,bash] +---- protoc -o kafkamsg.protoset --include_imports kafkamsg.proto -``` +---- -#### Example +==== Example Assume you have following proto schema in `kafkamsg.proto`: -```protobuf +[,protobuf] +---- syntax = "proto3"; import "google/protobuf/timestamp.proto"; @@ -583,99 +634,114 @@ message TopicMessage { message TopicKey { float fvalue = 1; } -``` +---- "well-known" `google/protobuf` types are included so no additional proto files needed. To produce message run -```bash +[,bash] +---- kafkactl produce --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --proto-file kafkamsg.proto -``` +---- or with protoset -```bash +[,bash] +---- kafkactl produce --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --protoset-file kafkamsg.protoset -``` +---- To consume messages run -```bash +[,bash] +---- kafkactl consume --key-proto-type TopicKey --value-proto-type TopicValue --proto-file kafkamsg.proto -``` +---- or with protoset -```bash +[,bash] +---- kafkactl consume --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset -``` +---- -### Altering topics +=== Altering topics Using the `alter topic` command allows you to change the partition count, replication factor and topic-level configurations of an existing topic. The partition count can be increased with: -```bash +[,bash] +---- kafkactl alter topic my-topic --partitions 32 -``` +---- The replication factor can be altered with: -```bash +[,bash] +---- kafkactl alter topic my-topic --replication-factor 2 -``` +---- -> :information_source: when altering replication factor, kafkactl tries to keep the number of replicas assigned to each -> broker balanced. If you need more control over the assigned replicas use `alter partition` directly. +____ +:information_source: when altering replication factor, kafkactl tries to keep the number of replicas assigned to each +broker balanced. If you need more control over the assigned replicas use `alter partition` directly. +____ The topic configs can be edited by supplying key value pairs as follows: -```bash +[,bash] +---- kafkactl alter topic my-topic --config retention.ms=3600000 --config cleanup.policy=compact -``` +---- -> :bulb: use the flag `--validate-only` to perform a dry-run without actually modifying the topic +____ +:bulb: use the flag `--validate-only` to perform a dry-run without actually modifying the topic +____ -### Altering partitions +=== Altering partitions The assigned replicas of a partition can directly be altered with: -```bash +[,bash] +---- # set brokers 102,103 as replicas for partition 3 of topic my-topic kafkactl alter topic my-topic 3 -r 102,103 -``` +---- -### Clone topic +=== Clone topic New topic may be created from existing topic as follows: -```bash +[,bash] +---- kafkactl clone topic source-topic target-topic -``` +---- Source topic must exist, target topic must not exist. `kafkactl` clones partitions count, replication factor and config entries. -### Consumer groups +=== Consumer groups In order to get a list of consumer groups the `get consumer-groups` command can be used: -```bash +[,bash] +---- # all available consumer groups kafkactl get consumer-groups # only consumer groups for a single topic kafkactl get consumer-groups --topic my-topic # using command alias kafkactl get cg -``` +---- To get detailed information about the consumer group use `describe consumer-group`. If the parameter `--partitions` is provided details will be printed for each partition otherwise the partitions are aggregated to the clients. -```bash +[,bash] +---- # describe a consumer group kafkactl describe consumer-group my-group # show partition details only for partitions with lag @@ -684,13 +750,24 @@ kafkactl describe consumer-group my-group --only-with-lag kafkactl describe consumer-group my-group --topic my-topic # using command alias kafkactl describe cg my-group -``` +---- + +=== Delete Records from a topics + +Command to be used to delete records from partition, which have an offset smaller than the provided offset. + +[,bash] +---- +# delete records with offset < 123 from partition 0 and offset < 456 from partition 1 +kafkactl delete records my-topic --offset 0=123 --offset 1=456 +---- -### Create consumer groups +=== Create consumer groups A consumer-group can be created as follows: -```bash +[,bash] +---- # create group with offset for all partitions set to oldest kafkactl create consumer-group my-group --topic my-topic --oldest # create group with offset for all partitions set to newest @@ -699,25 +776,27 @@ kafkactl create consumer-group my-group --topic my-topic --newest kafkactl create consumer-group my-group --topic my-topic --partition 5 --offset 100 # create group for multiple topics with offset for all partitions set to oldest kafkactl create consumer-group my-group --topic my-topic-a --topic my-topic-b --oldest -``` +---- -### Clone consumer group +=== Clone consumer group A consumer group may be created as clone of another consumer group as follows: -```bash +[,bash] +---- kafkactl clone consumer-group source-group target-group -``` +---- Source group must exist and have committed offsets. Target group must not exist or don't have committed offsets. `kafkactl` clones topic assignment and partition offsets. -### Reset consumer group offsets +=== Reset consumer group offsets in order to ensure the reset does what it is expected, per default only the results are printed without actually executing it. Use the additional parameter `--execute` to perform the reset. -```bash +[,bash] +---- # reset offset of for all partitions to oldest offset kafkactl reset offset my-group --topic my-topic --oldest # reset offset of for all partitions to newest offset @@ -732,35 +811,38 @@ kafkactl reset offset my-group --topic my-topic-a --topic my-topic-b --oldest kafkactl reset offset my-group --topic my-topic-a --to-datetime 2014-04-26T17:24:37.123Z # reset offset to offset at a given timestamp(epoch)/datetime kafkactl reset offset my-group --topic my-topic-a --to-datetime 1697726906352 -``` +---- -### Delete consumer group offsets +=== Delete consumer group offsets In order to delete a consumer group offset use `delete offset` -```bash +[,bash] +---- # delete offset for all partitions of topic my-topic kafkactl delete offset my-group --topic my-topic # delete offset for partition 1 of topic my-topic kafkactl delete offset my-group --topic my-topic --partition 1 -``` +---- -### Delete consumer groups +=== Delete consumer groups In order to delete a consumer group or a list of consumer groups use `delete consumer-group` -```bash +[,bash] +---- # delete consumer group my-group kafkactl delete consumer-group my-group -``` +---- -### ACL Management +=== ACL Management -Available ACL operations are documented [here](https://docs.confluent.io/platform/current/kafka/authorization.html#operations). +Available ACL operations are documented https://docs.confluent.io/platform/current/kafka/authorization.html#operations[here]. -#### Create a new ACL +==== Create a new ACL -```bash +[,bash] +---- # create an acl that allows topic read for a user 'consumer' kafkactl create acl --topic my-topic --operation read --principal User:consumer --allow # create an acl that denies topic write for a user 'consumer' coming from a specific host @@ -769,11 +851,12 @@ kafkactl create acl --topic my-topic --operation write --host 1.2.3.4 --principa kafkactl create acl --topic my-topic --operation read --operation describe --principal User:consumer --allow # allow on all topics with prefix common prefix kafkactl create acl --topic my-prefix --pattern prefixed --operation read --principal User:consumer --allow -``` +---- -#### List ACLs +==== List ACLs -```bash +[,bash] +---- # list all acl kafkactl get acl # list all acl (alias command) @@ -782,11 +865,12 @@ kafkactl get access-control-list kafkactl get acl --topics # filter only consumer group resources with operation read kafkactl get acl --groups --operation read -``` +---- -#### Delete ACLs +==== Delete ACLs -```bash +[,bash] +---- # delete all topic read acls kafkactl delete acl --topics --operation read --pattern any # delete all topic acls for any operation @@ -795,38 +879,42 @@ kafkactl delete acl --topics --operation any --pattern any kafkactl delete acl --cluster --operation any --pattern any # delete all consumer-group acls with operation describe, patternType prefixed and permissionType allow kafkactl delete acl --groups --operation describe --pattern prefixed --allow -``` +---- -### Getting Brokers +=== Getting Brokers To get the list of brokers of a kafka cluster use `get brokers` -```bash +[,bash] +---- # get the list of brokers kafkactl get brokers -``` +---- -### Describe Broker +=== Describe Broker To view configs for a single broker use `describe broker` -```bash +[,bash] +---- # describe broker kafkactl describe broker 1 -``` +---- -## Development +== Development In order to see linter errors before commit, add the following pre-commit hook: -```bash +[,bash] +---- pip install --user pre-commit pre-commit install -``` +---- -### Pull requests +=== Pull requests -```shell +[,shell] +---- # checkout locally PULL_REQUEST_ID=123 LOCAL_BRANCH_NAME=feature/abc @@ -838,4 +926,4 @@ NAME=username REMOTE_BRANCH_NAME=abc git remote add $NAME git@github.com:$NAME/kafkactl.git git push $NAME ${LOCAL_BRANCH_NAME}:${REMOTE_BRANCH_NAME} -``` +---- diff --git a/cmd/consume/consume.go b/cmd/consume/consume.go index 846a79e..c2bd9a1 100644 --- a/cmd/consume/consume.go +++ b/cmd/consume/consume.go @@ -50,6 +50,7 @@ func NewConsumeCmd() *cobra.Command { cmdConsume.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description") cmdConsume.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type") cmdConsume.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type") + cmdConsume.Flags().StringVarP(&flags.IsolationLevel, "isolation-level", "i", "", "isolationLevel to use. One of: ReadUncommitted|ReadCommitted") if err := cmdConsume.RegisterFlagCompletionFunc("group", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { return consumergroups.CompleteConsumerGroups(cmd, args, toComplete) diff --git a/cmd/create/create-topic.go b/cmd/create/create-topic.go index 384aa4a..35c34f4 100644 --- a/cmd/create/create-topic.go +++ b/cmd/create/create-topic.go @@ -25,7 +25,7 @@ func newCreateTopicCmd() *cobra.Command { } cmdCreateTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", 1, "number of partitions") - cmdCreateTopic.Flags().Int16VarP(&flags.ReplicationFactor, "replication-factor", "r", 1, "replication factor") + cmdCreateTopic.Flags().Int16VarP(&flags.ReplicationFactor, "replication-factor", "r", -1, "replication factor") cmdCreateTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only") cmdCreateTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`") diff --git a/cmd/deletion/delete-records.go b/cmd/deletion/delete-records.go new file mode 100644 index 0000000..9bbdbf7 --- /dev/null +++ b/cmd/deletion/delete-records.go @@ -0,0 +1,36 @@ +package deletion + +import ( + "github.com/deviceinsight/kafkactl/cmd/validation" + "github.com/deviceinsight/kafkactl/internal/k8s" + "github.com/deviceinsight/kafkactl/internal/topic" + "github.com/deviceinsight/kafkactl/output" + "github.com/spf13/cobra" +) + +func newDeleteRecordsCmd() *cobra.Command { + + var flags topic.DeleteRecordsFlags + + var cmdDeleteRecords = &cobra.Command{ + Use: "records TOPIC", + Short: "delete a records from a topic", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + if !k8s.NewOperation().TryRun(cmd, args) { + if err := (&topic.Operation{}).DeleteRecords(args[0], flags); err != nil { + output.Fail(err) + } + } + }, + ValidArgsFunction: topic.CompleteTopicNames, + } + + cmdDeleteRecords.Flags().StringArrayVarP(&flags.Offsets, "offset", "", flags.Offsets, "offsets in format `partition=offset`. records with smaller offset will be deleted.") + + if err := validation.MarkFlagAtLeastOneRequired(cmdDeleteRecords.Flags(), "offset"); err != nil { + panic(err) + } + + return cmdDeleteRecords +} diff --git a/cmd/deletion/delete-records_test.go b/cmd/deletion/delete-records_test.go new file mode 100644 index 0000000..52132af --- /dev/null +++ b/cmd/deletion/delete-records_test.go @@ -0,0 +1,77 @@ +package deletion_test + +import ( + "strings" + "testing" + + "github.com/deviceinsight/kafkactl/testutil" +) + +func TestDeleteRecordsIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + + topicName := testutil.CreateTopic(t, "delete-records-", "--partitions", "2") + + testutil.ProduceMessageOnPartition(t, topicName, "key-1", "a", 0, 0) + testutil.ProduceMessageOnPartition(t, topicName, "key-1", "b", 0, 1) + testutil.ProduceMessageOnPartition(t, topicName, "key-2", "c", 1, 0) + testutil.ProduceMessageOnPartition(t, topicName, "key-2", "d", 1, 1) + testutil.ProduceMessageOnPartition(t, topicName, "key-2", "e", 1, 2) + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + // check initial messages + if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--print-keys", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n") + + if len(messages) != 5 { + t.Fatalf("expected 5 messages, got %d", len(messages)) + } + + // delete records + if _, err := kafkaCtl.Execute("delete", "records", topicName, "--offset", "0=1", "--offset", "1=2"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + // check messages + if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--print-keys", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + messages = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n") + + if len(messages) != 2 { + t.Fatalf("expected 2 messages, got %d", len(messages)) + } + + testutil.AssertContains(t, "key-1#b", messages) + testutil.AssertContains(t, "key-2#e", messages) +} + +func TestDeleteRecordsAutoCompletionIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + + prefix := "delete-complete-" + + topicName1 := testutil.CreateTopic(t, prefix+"a") + topicName2 := testutil.CreateTopic(t, prefix+"b") + topicName3 := testutil.CreateTopic(t, prefix+"c") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + kafkaCtl.Verbose = false + + if _, err := kafkaCtl.Execute("__complete", "delete", "records", ""); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + outputLines := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n") + + testutil.AssertContains(t, topicName1, outputLines) + testutil.AssertContains(t, topicName2, outputLines) + testutil.AssertContains(t, topicName3, outputLines) +} diff --git a/cmd/deletion/delete.go b/cmd/deletion/delete.go index ead6d31..c825d58 100644 --- a/cmd/deletion/delete.go +++ b/cmd/deletion/delete.go @@ -8,12 +8,13 @@ func NewDeleteCmd() *cobra.Command { var cmdDelete = &cobra.Command{ Use: "delete", - Short: "delete topics, consumerGroups, consumer-group-offset, acls", + Short: "delete topics, consumerGroups, consumer-group-offset, acls, records", } cmdDelete.AddCommand(newDeleteTopicCmd()) cmdDelete.AddCommand(newDeleteConsumerGroupCmd()) cmdDelete.AddCommand(newDeleteConsumerGroupOffsetCmd()) cmdDelete.AddCommand(newDeleteACLCmd()) + cmdDelete.AddCommand(newDeleteRecordsCmd()) return cmdDelete } diff --git a/cmd/describe/describe-topic.go b/cmd/describe/describe-topic.go index fa6f1db..c95007e 100644 --- a/cmd/describe/describe-topic.go +++ b/cmd/describe/describe-topic.go @@ -26,7 +26,7 @@ func newDescribeTopicCmd() *cobra.Command { } cmdDescribeTopic.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml|wide") - cmdDescribeTopic.Flags().BoolVarP(&flags.PrintConfigs, "print-configs", "c", true, "print configs") + cmdDescribeTopic.Flags().StringVarP((*string)(&flags.PrintConfigs), "print-configs", "c", "no_defaults", "print configs. One of none|no_defaults|all") cmdDescribeTopic.Flags().BoolVarP(&flags.SkipEmptyPartitions, "skip-empty", "s", false, "show only partitions that have a messages") return cmdDescribeTopic diff --git a/cmd/describe/describe-topic_test.go b/cmd/describe/describe-topic_test.go index 285e085..bcba19b 100644 --- a/cmd/describe/describe-topic_test.go +++ b/cmd/describe/describe-topic_test.go @@ -4,9 +4,69 @@ import ( "strings" "testing" + "github.com/deviceinsight/kafkactl/internal" + "github.com/deviceinsight/kafkactl/internal/topic" + "github.com/deviceinsight/kafkactl/testutil" ) +func TestDescribeTopicConfigsIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + + prefix := "describe-t-configs-" + + topicName1 := testutil.CreateTopic(t, prefix, "--config", "retention.ms=3600000") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + kafkaCtl.Verbose = false + + // default --print-configs=no_defaults + if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-o", "yaml"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + describedTopic, err := topic.FromYaml(kafkaCtl.GetStdOut()) + if err != nil { + t.Fatalf("failed to read yaml: %v", err) + } + + configKeys := getConfigKeys(describedTopic.Configs) + + testutil.AssertArraysEquals(t, []string{"retention.ms"}, configKeys) + testutil.AssertEquals(t, "3600000", describedTopic.Configs[0].Value) + + // explicitly without defaults + if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-c", "no_defaults", "-o", "yaml"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + describedTopic, err = topic.FromYaml(kafkaCtl.GetStdOut()) + if err != nil { + t.Fatalf("failed to read yaml: %v", err) + } + + configKeys = getConfigKeys(describedTopic.Configs) + + testutil.AssertArraysEquals(t, []string{"retention.ms"}, configKeys) + testutil.AssertEquals(t, "3600000", describedTopic.Configs[0].Value) + + // all configs + if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-c", "all", "-o", "yaml"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + describedTopic, err = topic.FromYaml(kafkaCtl.GetStdOut()) + if err != nil { + t.Fatalf("failed to read yaml: %v", err) + } + + configKeys = getConfigKeys(describedTopic.Configs) + + testutil.AssertContains(t, "retention.ms", configKeys) + testutil.AssertContains(t, "cleanup.policy", configKeys) +} + func TestDescribeTopicAutoCompletionIntegration(t *testing.T) { testutil.StartIntegrationTest(t) @@ -30,3 +90,11 @@ func TestDescribeTopicAutoCompletionIntegration(t *testing.T) { testutil.AssertContains(t, topicName2, outputLines) testutil.AssertContains(t, topicName3, outputLines) } + +func getConfigKeys(configs []internal.Config) []string { + keys := make([]string, len(configs)) + for i, config := range configs { + keys[i] = config.Name + } + return keys +} diff --git a/docs/kafkactl_docs.md b/docs/kafkactl_docs.md index bf33c82..c050b66 100644 --- a/docs/kafkactl_docs.md +++ b/docs/kafkactl_docs.md @@ -23,7 +23,7 @@ A command-line interface the simplifies interaction with Kafka. * [kafkactl config](#kafkactl-config) - show and edit configurations * [kafkactl consume](#kafkactl-consume) - consume messages from a topic * [kafkactl create](#kafkactl-create) - create topics, consumerGroups, acls -* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls +* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls, records * [kafkactl describe](#kafkactl-describe) - describe topics, consumerGroups, brokers * [kafkactl get](#kafkactl-get) - get info about topics, consumerGroups, acls, brokers * [kafkactl produce](#kafkactl-produce) - produce messages to a topic @@ -433,6 +433,7 @@ kafkactl consume TOPIC [flags] --from-timestamp string consume data from offset of given timestamp -g, --group string consumer group to join -h, --help help for consume + -i, --isolation-level string isolationLevel to use. One of: ReadUncommitted|ReadCommitted --key-encoding string key encoding (auto-detected by default). One of: none|hex|base64 --key-proto-type string key protobuf message type --max-messages int stop consuming after n messages have been read (default -1) @@ -572,7 +573,7 @@ kafkactl create topic TOPIC [flags] -c, --config key=value configs in format key=value -h, --help help for topic -p, --partitions int32 number of partitions (default 1) - -r, --replication-factor int16 replication factor (default 1) + -r, --replication-factor int16 replication factor (default -1) -v, --validate-only validate only ``` @@ -590,7 +591,7 @@ kafkactl create topic TOPIC [flags] ### kafkactl delete -delete topics, consumerGroups, consumer-group-offset, acls +delete topics, consumerGroups, consumer-group-offset, acls, records #### Options @@ -611,6 +612,7 @@ delete topics, consumerGroups, consumer-group-offset, acls * [kafkactl delete access-control-list](#kafkactl-delete-access-control-list) - delete an acl * [kafkactl delete consumer-group](#kafkactl-delete-consumer-group) - delete a consumer-group * [kafkactl delete consumer-group-offset](#kafkactl-delete-consumer-group-offset) - delete a consumer-group-offset +* [kafkactl delete records](#kafkactl-delete-records) - delete a records from a topic * [kafkactl delete topic](#kafkactl-delete-topic) - delete a topic @@ -645,7 +647,7 @@ kafkactl delete access-control-list [flags] ##### SEE ALSO -* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls +* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls, records #### kafkactl delete consumer-group-offset @@ -673,7 +675,7 @@ kafkactl delete consumer-group-offset CONSUMER-GROUP --topic=TOPIC --partition=P ##### SEE ALSO -* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls +* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls, records #### kafkactl delete consumer-group @@ -699,7 +701,34 @@ kafkactl delete consumer-group CONSUMER-GROUP [flags] ##### SEE ALSO -* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls +* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls, records + + +#### kafkactl delete records + +delete a records from a topic + +``` +kafkactl delete records TOPIC [flags] +``` + +##### Options + +``` + -h, --help help for records + --offset partition=offset offsets in format partition=offset. records with smaller offset will be deleted. +``` + +##### Options inherited from parent commands + +``` + -C, --config-file string config file. one of: [$HOME/.config/kafkactl $HOME/.kafkactl $SNAP_REAL_HOME/.config/kafkactl $SNAP_DATA/kafkactl /etc/kafkactl] + -V, --verbose verbose output +``` + +##### SEE ALSO + +* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls, records #### kafkactl delete topic @@ -725,7 +754,7 @@ kafkactl delete topic TOPIC [flags] ##### SEE ALSO -* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls +* [kafkactl delete](#kafkactl-delete) - delete topics, consumerGroups, consumer-group-offset, acls, records ### kafkactl describe @@ -822,10 +851,10 @@ kafkactl describe topic TOPIC [flags] ##### Options ``` - -h, --help help for topic - -o, --output string output format. One of: json|yaml|wide - -c, --print-configs print configs (default true) - -s, --skip-empty show only partitions that have a messages + -h, --help help for topic + -o, --output string output format. One of: json|yaml|wide + -c, --print-configs string print configs. One of none|no_defaults|all (default "no_defaults") + -s, --skip-empty show only partitions that have a messages ``` ##### Options inherited from parent commands diff --git a/go.mod b/go.mod index ea06ad9..b222bb1 100644 --- a/go.mod +++ b/go.mod @@ -1,32 +1,32 @@ module github.com/deviceinsight/kafkactl -go 1.20 +go 1.21.6 require ( github.com/IBM/sarama v1.42.1 github.com/Rican7/retry v0.3.1 github.com/golang/protobuf v1.5.3 - github.com/jhump/protoreflect v1.15.3 + github.com/jhump/protoreflect v1.15.4 github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891 github.com/linkedin/goavro/v2 v2.12.0 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 - github.com/spf13/viper v1.17.0 + github.com/spf13/viper v1.18.2 github.com/xdg-go/scram v1.1.2 go.uber.org/ratelimit v0.3.0 - golang.org/x/sync v0.5.0 - google.golang.org/protobuf v1.31.0 + golang.org/x/sync v0.6.0 + google.golang.org/protobuf v1.32.0 gopkg.in/errgo.v2 v2.1.0 gopkg.in/yaml.v2 v2.4.0 ) require ( github.com/benbjohnson/clock v1.3.5 // indirect - github.com/bufbuild/protocompile v0.6.0 // indirect + github.com/bufbuild/protocompile v0.7.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-resiliency v1.5.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect @@ -41,31 +41,32 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.4 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/pelletier/go-toml/v2 v2.1.0 // indirect - github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pelletier/go-toml/v2 v2.1.1 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/sagikazarmark/locafero v0.3.0 // indirect + github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect - github.com/spf13/afero v1.10.0 // indirect - github.com/spf13/cast v1.5.1 // indirect + github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.15.0 // indirect - golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 // indirect - golang.org/x/net v0.18.0 // indirect - golang.org/x/sys v0.14.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect + golang.org/x/net v0.20.0 // indirect + golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect - google.golang.org/grpc v1.59.0 // indirect + google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect + google.golang.org/grpc v1.60.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 863070e..2f76db7 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bufbuild/protocompile v0.6.0 h1:Uu7WiSQ6Yj9DbkdnOe7U4mNKp58y9WDMKDn28/ZlunY= github.com/bufbuild/protocompile v0.6.0/go.mod h1:YNP35qEYoYGme7QMtz5SBCoN4kL4g12jTtjuzRNdjpE= +github.com/bufbuild/protocompile v0.7.1 h1:Kd8fb6EshOHXNNRtYAmLAwy/PotlyFoN0iMbuwGNh0M= +github.com/bufbuild/protocompile v0.7.1/go.mod h1:+Etjg4guZoAqzVk2czwEQP12yaxLJ8DxuqCJ9qHdH94= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -62,6 +64,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.5.0 h1:dRsaR00whmQD+SgVKlq/vCRFNgtEb5yppyeVos3Yce0= +github.com/eapache/go-resiliency v1.5.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -173,11 +177,15 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls= github.com/jhump/protoreflect v1.15.3/go.mod h1:4ORHmSBmlCW8fh3xHmJMGyul1zNqZK4Elxc8qKP+p1k= +github.com/jhump/protoreflect v1.15.4 h1:mrwJhfQGGljwvR/jPEocli8KA6G9afbQpH8NY2wORcI= +github.com/jhump/protoreflect v1.15.4/go.mod h1:2B+zwrnMY3TTIqEK01OG/d3pyUycQBfDf+bx8fE2DNg= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -194,8 +202,12 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI= +github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= @@ -207,24 +219,34 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sagikazarmark/locafero v0.3.0 h1:zT7VEGWC2DTflmccN/5T1etyKvxSxpHsjb9cJvm4SvQ= github.com/sagikazarmark/locafero v0.3.0/go.mod h1:w+v7UsPNFwzF1cHuOajOOzoq4U7v/ig1mpRjqV+Bu1U= +github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= +github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.10.0 h1:EaGW2JJh15aKOejeuJ+wpFSHnbd7GE6Wvp3TsNhb6LY= github.com/spf13/afero v1.10.0/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/UhQ= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA= github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48= +github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= +github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= +github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ= +github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -271,8 +293,10 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= -golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -285,6 +309,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 h1:mchzmB1XO2pMaKFRqk/+MV3mgGG96aqaPXaMifQU47w= golang.org/x/exp v0.0.0-20231108232855-2478ac86f678/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= +golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o= +golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -346,6 +372,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -368,6 +396,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -406,8 +436,10 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -540,8 +572,12 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917 h1:nz5NESFLZbJGPFxDT/HCn+V1mZ8JGNoY4nUpmW/Y2eg= +google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917/go.mod h1:pZqR+glSb11aJ+JQcczCvgf47+duRuzNSKqE8YAQnV0= google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -560,6 +596,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= +google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -574,6 +612,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/broker/broker-operation.go b/internal/broker/broker-operation.go index 3444696..711a1c8 100644 --- a/internal/broker/broker-operation.go +++ b/internal/broker/broker-operation.go @@ -77,7 +77,7 @@ func (operation *Operation) GetBrokers(flags GetBrokersFlags) error { Name: fmt.Sprint(broker.ID()), } - if configs, err = internal.ListConfigs(&admin, brokerConfig); err != nil { + if configs, err = internal.ListConfigs(&admin, brokerConfig, false); err != nil { return err } @@ -153,7 +153,7 @@ func (operation *Operation) DescribeBroker(id int32, flags DescribeBrokerFlags) Name: fmt.Sprint(broker.ID()), } - if configs, err = internal.ListConfigs(&admin, brokerConfig); err != nil { + if configs, err = internal.ListConfigs(&admin, brokerConfig, false); err != nil { return err } diff --git a/internal/common-operation.go b/internal/common-operation.go index 53044ec..8d8c929 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -50,6 +50,10 @@ type K8sConfig struct { ImagePullSecret string } +type ConsumerConfig struct { + IsolationLevel string +} + type ProducerConfig struct { Partitioner string RequiredAcks string @@ -69,6 +73,7 @@ type ClientContext struct { AvroJSONCodec avro.JSONCodec Protobuf protobuf.SearchContext Producer ProducerConfig + Consumer ConsumerConfig } type Config struct { @@ -115,6 +120,7 @@ func CreateClientContext() (ClientContext, error) { context.Producer.Partitioner = viper.GetString("contexts." + context.Name + ".producer.partitioner") context.Producer.RequiredAcks = viper.GetString("contexts." + context.Name + ".producer.requiredAcks") context.Producer.MaxMessageBytes = viper.GetInt("contexts." + context.Name + ".producer.maxMessageBytes") + context.Consumer.IsolationLevel = viper.GetString("contexts." + context.Name + ".consumer.isolationLevel") context.Sasl.Enabled = viper.GetBool("contexts." + context.Name + ".sasl.enabled") context.Sasl.Username = viper.GetString("contexts." + context.Name + ".sasl.username") context.Sasl.Password = viper.GetString("contexts." + context.Name + ".sasl.password") @@ -309,10 +315,9 @@ func TopicExists(client *sarama.Client, name string) (bool, error) { return false, nil } -func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource) ([]Config, error) { +func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource, includeDefaults bool) ([]Config, error) { var ( - configs = make([]Config, 0) configEntries []sarama.ConfigEntry err error ) @@ -321,15 +326,21 @@ func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource) ([] return nil, errors.Wrap(err, fmt.Sprintf("failed to describe %v config", getResourceTypeName(resource.Type))) } + return listConfigsFromEntries(configEntries, includeDefaults), nil +} + +func listConfigsFromEntries(configEntries []sarama.ConfigEntry, includeDefaults bool) []Config { + var configs = make([]Config, 0) + for _, configEntry := range configEntries { - if !configEntry.Default && configEntry.Source != sarama.SourceDefault { + if includeDefaults || (!configEntry.Default && configEntry.Source != sarama.SourceDefault) { entry := Config{Name: configEntry.Name, Value: configEntry.Value} configs = append(configs, entry) } } - return configs, nil + return configs } func getResourceTypeName(resourceType sarama.ConfigResourceType) string { diff --git a/internal/common-operation_test.go b/internal/common-operation_test.go new file mode 100644 index 0000000..a4fc3ec --- /dev/null +++ b/internal/common-operation_test.go @@ -0,0 +1,83 @@ +package internal + +import ( + "reflect" + "testing" + + "github.com/IBM/sarama" +) + +func TestListConfigsFromEntries(t *testing.T) { + testCases := []struct { + name string + entries []sarama.ConfigEntry + includeDefaults bool + configs []Config + }{ + { + name: "not include defaults, empty entries", + entries: []sarama.ConfigEntry{}, + configs: []Config{}, + }, + { + name: "not include defaults", + entries: []sarama.ConfigEntry{ + { + Name: "non_default", + Value: "ND", + Default: false, + Source: sarama.SourceUnknown, + }, + { + Name: "default", + Value: "D", + Default: true, + Source: sarama.SourceDefault, + }, + }, + configs: []Config{ + {Name: "non_default", Value: "ND"}, + }, + }, + { + name: "include defaults, empty entries", + entries: []sarama.ConfigEntry{}, + configs: []Config{}, + includeDefaults: true, + }, + { + name: "include defaults", + entries: []sarama.ConfigEntry{ + { + Name: "non_default", + Value: "ND", + Default: false, + Source: sarama.SourceUnknown, + }, + { + Name: "default", + Value: "D", + Default: true, + Source: sarama.SourceDefault, + }, + }, + configs: []Config{ + {Name: "non_default", Value: "ND"}, + {Name: "default", Value: "D"}, + }, + includeDefaults: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + configs := listConfigsFromEntries(tc.entries, tc.includeDefaults) + + if len(configs) > 0 && + len(tc.configs) > 0 && + !reflect.DeepEqual(configs, tc.configs) { + t.Fatalf("expect: %v, got %v", tc.configs, configs) + } + }) + } +} diff --git a/internal/consume/GroupConsumer.go b/internal/consume/GroupConsumer.go index c23becd..b2b39a5 100644 --- a/internal/consume/GroupConsumer.go +++ b/internal/consume/GroupConsumer.go @@ -96,10 +96,13 @@ func (handler *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, c for { select { - case message := <-messageChannel: - if message != nil { - handler.messages <- message + case message, ok := <-messageChannel: + if !ok { + output.Debugf("consume claim via channel interrupted") + handler.cancel() + return nil } + handler.messages <- message session.MarkMessage(message, "") case <-handler.stopConsumers: output.Debugf("stop consume claim via channel") diff --git a/internal/consume/PartitionConsumer.go b/internal/consume/PartitionConsumer.go index ae17ae0..f111b70 100644 --- a/internal/consume/PartitionConsumer.go +++ b/internal/consume/PartitionConsumer.go @@ -3,8 +3,6 @@ package consume import ( "context" "math" - "strconv" - "strings" "time" "github.com/IBM/sarama" @@ -190,7 +188,7 @@ func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPar } else if flags.FromBeginning { return (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest) } else if len(flags.Offsets) > 0 { - return extractOffsetForPartition(flags, currentPartition) + return util.ExtractOffsetForPartition(flags.Offsets, currentPartition) } return sarama.OffsetNewest, nil } @@ -213,32 +211,6 @@ func getEndOffset(client *sarama.Client, topic string, flags Flags, currentParti return sarama.OffsetNewest, nil } -func extractOffsetForPartition(flags Flags, currentPartition int32) (int64, error) { - for _, offsetFlag := range flags.Offsets { - offsetParts := strings.Split(offsetFlag, "=") - - if len(offsetParts) == 2 { - - partition, err := strconv.Atoi(offsetParts[0]) - if err != nil { - return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) - } - - if int32(partition) != currentPartition { - continue - } - - offset, err := strconv.ParseInt(offsetParts[1], 10, 64) - if err != nil { - return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) - } - - return offset, nil - } - } - return ErrOffset, errors.Errorf("unable to find offset parameter for partition %d: %s", currentPartition, flags.Offsets) -} - func hasExclusiveConditions(flags ...bool) bool { value := 0 for _, flag := range flags { diff --git a/internal/consume/consume-operation.go b/internal/consume/consume-operation.go index 5b282f9..239a1f6 100644 --- a/internal/consume/consume-operation.go +++ b/internal/consume/consume-operation.go @@ -3,6 +3,7 @@ package consume import ( "context" "sort" + "strings" "time" "github.com/deviceinsight/kafkactl/internal/helpers" @@ -39,6 +40,7 @@ type Flags struct { ProtosetFiles []string KeyProtoType string ValueProtoType string + IsolationLevel string } type ConsumedMessage struct { @@ -65,7 +67,16 @@ func (operation *Operation) Consume(topic string, flags Flags) error { return err } - if client, err = internal.CreateClient(&clientContext); err != nil { + config, err := internal.CreateClientConfig(&clientContext) + if err != nil { + return err + } + + if err = applyConsumerConfigs(config, clientContext, flags); err != nil { + return err + } + + if client, err = sarama.NewClient(clientContext.Brokers, config); err != nil { return errors.Wrap(err, "failed to create client") } @@ -166,6 +177,37 @@ func (operation *Operation) Consume(topic string, flags Flags) error { return nil } +func applyConsumerConfigs(config *sarama.Config, clientContext internal.ClientContext, flags Flags) error { + + var err error + + isolationLevel := clientContext.Consumer.IsolationLevel + if flags.IsolationLevel != "" { + isolationLevel = flags.IsolationLevel + } + + if config.Consumer.IsolationLevel, err = parseIsolationLevel(isolationLevel); err != nil { + return err + } + + output.Debugf("using isolationLevel=%v", config.Consumer.IsolationLevel) + + return nil +} + +func parseIsolationLevel(isolationLevel string) (sarama.IsolationLevel, error) { + switch strings.ToLower(isolationLevel) { + case "": + return sarama.ReadCommitted, nil + case "readcommitted": + return sarama.ReadCommitted, nil + case "readuncommitted": + return sarama.ReadUncommitted, nil + default: + return sarama.ReadCommitted, errors.Errorf("isolationLevel=%s not supported", isolationLevel) + } +} + func deserializeMessages(ctx context.Context, flags Flags, messages <-chan *sarama.ConsumerMessage, stopConsumers chan<- bool, deserializers MessageDeserializerChain) *errgroup.Group { errorGroup, _ := errgroup.WithContext(ctx) diff --git a/internal/topic/topic-operation.go b/internal/topic/topic-operation.go index 3f01999..ca0c37f 100644 --- a/internal/topic/topic-operation.go +++ b/internal/topic/topic-operation.go @@ -37,10 +37,13 @@ type requestedTopicFields struct { partitionLeader bool partitionReplicas bool partitionISRs bool - config bool + config PrintConfigsParam } -var allFields = requestedTopicFields{partitionID: true, partitionOffset: true, partitionLeader: true, partitionReplicas: true, partitionISRs: true, config: true} +var allFields = requestedTopicFields{ + partitionID: true, partitionOffset: true, partitionLeader: true, + partitionReplicas: true, partitionISRs: true, config: NonDefaultConfigs, +} type GetTopicsFlags struct { OutputFormat string @@ -60,8 +63,20 @@ type AlterTopicFlags struct { Configs []string } +type DeleteRecordsFlags struct { + Offsets []string +} + +type PrintConfigsParam string + +const ( + NoConfigs PrintConfigsParam = "none" + AllConfigs PrintConfigsParam = "all" + NonDefaultConfigs PrintConfigsParam = "no_defaults" +) + type DescribeTopicFlags struct { - PrintConfigs bool + PrintConfigs PrintConfigsParam SkipEmptyPartitions bool OutputFormat string } @@ -161,7 +176,10 @@ func (operation *Operation) DescribeTopic(topic string, flags DescribeTopicFlags return errors.Wrap(err, "failed to create cluster admin") } - if t, err = readTopic(&client, &admin, topic, allFields); err != nil { + fields := allFields + fields.config = flags.PrintConfigs + + if t, err = readTopic(&client, &admin, topic, fields); err != nil { return errors.Wrap(err, "failed to read topic") } @@ -170,7 +188,7 @@ func (operation *Operation) DescribeTopic(topic string, flags DescribeTopicFlags func (operation *Operation) printTopic(topic Topic, flags DescribeTopicFlags) error { - if !flags.PrintConfigs { + if flags.PrintConfigs == NoConfigs { topic.Configs = nil } @@ -401,7 +419,11 @@ func (operation *Operation) AlterTopic(topic string, flags AlterTopicFlags) erro } if flags.ValidateOnly { - describeFlags := DescribeTopicFlags{PrintConfigs: len(flags.Configs) > 0} + printConfigs := NoConfigs + if len(flags.Configs) > 0 { + printConfigs = NonDefaultConfigs + } + describeFlags := DescribeTopicFlags{PrintConfigs: printConfigs} return operation.printTopic(t, describeFlags) } return nil @@ -472,7 +494,7 @@ func (operation *Operation) CloneTopic(sourceTopic, targetTopic string) error { requestedFields := requestedTopicFields{ partitionID: true, partitionReplicas: true, - config: true, + config: NonDefaultConfigs, } if t, err = readTopic(&client, &admin, sourceTopic, requestedFields); err != nil { @@ -581,7 +603,7 @@ func (operation *Operation) GetTopics(flags GetTopicsFlags) error { } else if flags.OutputFormat == "compact" { tableWriter.Initialize() } else if flags.OutputFormat == "wide" { - requestedFields = requestedTopicFields{partitionID: true, partitionReplicas: true, config: true} + requestedFields = requestedTopicFields{partitionID: true, partitionReplicas: true, config: NonDefaultConfigs} if err := tableWriter.WriteHeader("TOPIC", "PARTITIONS", "REPLICATION FACTOR", "CONFIGS"); err != nil { return err } @@ -652,6 +674,30 @@ func (operation *Operation) GetTopics(flags GetTopicsFlags) error { return nil } +func (operation *Operation) DeleteRecords(topic string, flags DeleteRecordsFlags) error { + + var ( + err error + context internal.ClientContext + admin sarama.ClusterAdmin + ) + + if context, err = internal.CreateClientContext(); err != nil { + return err + } + + if admin, err = internal.CreateClusterAdmin(&context); err != nil { + return errors.Wrap(err, "failed to create cluster admin") + } + + offsets, parseErr := util.ParseOffsets(flags.Offsets) + if parseErr != nil { + return parseErr + } + + return admin.DeleteRecords(topic, offsets) +} + func readTopic(client *sarama.Client, admin *sarama.ClusterAdmin, name string, requestedFields requestedTopicFields) (Topic, error) { var ( err error @@ -725,14 +771,14 @@ func readTopic(client *sarama.Client, admin *sarama.ClusterAdmin, name string, r return top.Partitions[i].ID < top.Partitions[j].ID }) - if requestedFields.config { + if requestedFields.config != NoConfigs { topicConfig := sarama.ConfigResource{ Type: sarama.TopicResource, Name: name, } - if top.Configs, err = internal.ListConfigs(admin, topicConfig); err != nil { + if top.Configs, err = internal.ListConfigs(admin, topicConfig, requestedFields.config == AllConfigs); err != nil { return top, err } } diff --git a/util/parse_offsets.go b/util/parse_offsets.go new file mode 100644 index 0000000..bdebdc3 --- /dev/null +++ b/util/parse_offsets.go @@ -0,0 +1,65 @@ +package util + +import ( + "math" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +const ErrOffset = math.MinInt64 +const offsetSeparator = "=" + +func ParseOffsets(rawOffsets []string) (map[int32]int64, error) { + + offsets := make(map[int32]int64) + + for _, offsetFlag := range rawOffsets { + offsetParts := strings.Split(offsetFlag, offsetSeparator) + + if len(offsetParts) != 2 { + return nil, errors.Errorf("offset parameter has wrong format: %s %v", offsetFlag, rawOffsets) + } + + partition, err := strconv.Atoi(offsetParts[0]) + if err != nil { + return nil, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + offset, err := strconv.ParseInt(offsetParts[1], 10, 64) + if err != nil { + return nil, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + offsets[int32(partition)] = offset + } + + return offsets, nil +} + +func ExtractOffsetForPartition(rawOffsets []string, currentPartition int32) (int64, error) { + for _, offsetFlag := range rawOffsets { + offsetParts := strings.Split(offsetFlag, offsetSeparator) + + if len(offsetParts) == 2 { + + partition, err := strconv.Atoi(offsetParts[0]) + if err != nil { + return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + if int32(partition) != currentPartition { + continue + } + + offset, err := strconv.ParseInt(offsetParts[1], 10, 64) + if err != nil { + return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + return offset, nil + } + } + return ErrOffset, errors.Errorf("unable to find offset parameter for partition %d: %v", currentPartition, rawOffsets) +} diff --git a/util/parse_offsets_test.go b/util/parse_offsets_test.go new file mode 100644 index 0000000..9eb6b9b --- /dev/null +++ b/util/parse_offsets_test.go @@ -0,0 +1,66 @@ +package util_test + +import ( + "reflect" + "strings" + "testing" + + "github.com/deviceinsight/kafkactl/util" +) + +func TestParseOffsets(t *testing.T) { + + type testCases struct { + description string + input []string + wantOffsets map[int32]int64 + wantErr string + } + + for _, test := range []testCases{ + { + description: "successful_parsing", + input: []string{"1=222", "2=333", "5=444"}, + wantOffsets: map[int32]int64{1: 222, 2: 333, 5: 444}, + }, + { + description: "wrong_separator_fails", + input: []string{"1:222"}, + wantErr: "offset parameter has wrong format: 1:222 [1:222]", + }, + { + description: "partition_not_an_int_fails", + input: []string{"abc=222"}, + wantErr: "parsing \"abc\": invalid syntax", + }, + { + description: "offset_not_an_int_fails", + input: []string{"1=nope"}, + wantErr: "parsing \"nope\": invalid syntax", + }, + } { + t.Run(test.description, func(t *testing.T) { + + offsets, err := util.ParseOffsets(test.input) + + if test.wantErr != "" { + if err == nil { + t.Errorf("want error %q but got nil", test.wantErr) + } + + if !strings.Contains(err.Error(), test.wantErr) { + t.Errorf("want error %q got %q", test.wantErr, err) + } + + return + } + if err != nil { + t.Errorf("doesn't want error but got %s", err) + } + + if eq := reflect.DeepEqual(test.wantOffsets, offsets); !eq { + t.Errorf("want %q got %q", test.wantOffsets, offsets) + } + }) + } +}