Skip to content

Commit

Permalink
Merge pull request #24 from jeroenrinzema/v0.6.0-rc
Browse files Browse the repository at this point in the history
V0.6.0
  • Loading branch information
jeroenrinzema authored Oct 26, 2019
2 parents 9cb21cd + 49fa59e commit 4b24706
Show file tree
Hide file tree
Showing 55 changed files with 854 additions and 490 deletions.
19 changes: 8 additions & 11 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
name: Go
name: Go tests
on: [push]
jobs:

build:
name: Test
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.11.x', '1.12.x', '1.13.x' ]
name: Go ${{ matrix.go }}
steps:

- name: Set up Go 1.12
- uses: actions/checkout@v1
- name: Setup go
uses: actions/setup-go@v1
with:
go-version: 1.12
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v1

go-version: ${{ matrix.go }}
- name: Test
run: go test ./... -v -mod=vendor -race -count=1 -coverprofile=coverage.txt -covermode=atomic -timeout=120s
19 changes: 0 additions & 19 deletions .travis.yml

This file was deleted.

3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Commander 🚀
[![GoDoc](https://godoc.org/github.com/jeroenrinzema/commander?status.svg)](https://godoc.org/github.com/jeroenrinzema/commander)
[![Build Status](https://travis-ci.org/jeroenrinzema/commander.svg?branch=master)](https://travis-ci.org/jeroenrinzema/commander)
[![Coverage](https://codecov.io/gh/jeroenrinzema/commander/branch/master/graph/badge.svg)](https://codecov.io/gh/jeroenrinzema/commander)
[![Coverage Report](https://goreportcard.com/badge/github.com/jeroenrinzema/commander)](https://goreportcard.com/report/github.com/jeroenrinzema/commander)

Expand All @@ -9,7 +8,7 @@ Commander is Go library for writing event-driven applications. Enabling event so
## Getting started

1. [🚀 Examples](https://github.com/jeroenrinzema/commander/tree/master/examples)
1. [📚 Documentation](https://godoc.org/github.com/jeroenrinzema/commander)
2. [📚 Documentation](https://godoc.org/github.com/jeroenrinzema/commander)

---

Expand Down
15 changes: 8 additions & 7 deletions dialects/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Dialect

A commander dialects is responsible for the consumption/production of messages.
Check out the [dialect interface](https://github.com/jeroenrinzema/commander/blob/master/dialect.go) to see which methods have to be available to a dialect.

On construction of the commander instance is a connectionstring and available groups passed which is given to the dialect.
The dialect could when nessasery setup/initialize the given groups/connectionstring on for it's targeted protocol (ex: Kafka, RabbitMQ)

Below is a example mocking dialect shown that allowes messages to be consumed and produced in-memory. This is a very simple example and is not safe for concurrent actions.
- [Kafka](https://github.com/jeroenrinzema/commander/tree/master/dialects/kafka)
- [Mock](https://github.com/jeroenrinzema/commander/tree/master/dialects/mock)
- Redis
- RabbitMQ
- SQL
- Websocket
- io.Reader/io.Writer
- NATS
2 changes: 1 addition & 1 deletion dialects/kafka/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (client *Client) Claim(consumed *sarama.ConsumerMessage) (err error) {

select {
case subscription.messages <- message:
result := message.Await()
result := message.Finally()
if result != nil {
return ErrRetry
}
Expand Down
10 changes: 2 additions & 8 deletions dialects/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
// Dialect represents the kafka dialect
type Dialect struct {
Connection Config
Topics []types.Topic
Config *sarama.Config

consumer *consumer.Client
Expand Down Expand Up @@ -53,14 +52,9 @@ func (dialect *Dialect) Producer() types.Producer {
return dialect.producer
}

// Assigned is called when a topic gets created
func (dialect *Dialect) Assigned(topic types.Topic) {
dialect.Topics = append(dialect.Topics, topic)
}

// Open opens a kafka consumer and producer
func (dialect *Dialect) Open() (err error) {
err = dialect.consumer.Connect(dialect.Connection.Brokers, dialect.Config, dialect.Connection.InitialOffset, dialect.Topics...)
func (dialect *Dialect) Open(topics []types.Topic) (err error) {
err = dialect.consumer.Connect(dialect.Connection.Brokers, dialect.Config, dialect.Connection.InitialOffset, topics...)
if err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions dialects/kafka/metadata/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/Shopify/sarama"
"github.com/jeroenrinzema/commander"
"github.com/jeroenrinzema/commander/internal/metadata"
"github.com/jeroenrinzema/commander/internal/types"
)

Expand All @@ -25,9 +26,9 @@ func MessageFromMessage(consumed *sarama.ConsumerMessage) *commander.Message {
Data: consumed.Value,
Key: consumed.Key,
Timestamp: consumed.Timestamp,
Ctx: ctx,
}

message.NewCtx(ctx)
headers := map[string][]string{}

headers:
Expand Down Expand Up @@ -61,19 +62,19 @@ headers:
message.EOS = message.EOS.Parse(string(record.Value))
break
case HeaderParentID:
message.Ctx = types.NewParentIDContext(message.Ctx, types.ParentID(record.Value))
message.NewCtx(metadata.NewParentIDContext(message.Ctx(), metadata.ParentID(record.Value)))
break
case HeaderParentTimestamp:
unix, err := strconv.ParseInt(string(record.Value), 10, 64)
if err != nil {
continue headers
}

time := types.ParentTimestamp(time.Unix(0, unix))
message.Ctx = types.NewParentTimestampContext(message.Ctx, time)
time := metadata.ParentTimestamp(time.Unix(0, unix))
message.NewCtx(metadata.NewParentTimestampContext(message.Ctx(), time))
break
default:
headers[key] = strings.Split(string(record.Value), types.HeaderValueDevider)
headers[key] = strings.Split(string(record.Value), metadata.HeaderValueDevider)
break
}
}
Expand Down Expand Up @@ -106,23 +107,23 @@ func MessageToMessage(produce *commander.Message) *sarama.ProducerMessage {
},
}

parent, has := types.ParentIDFromContext(produce.Ctx)
parent, has := metadata.ParentIDFromContext(produce.Ctx())
if has {
headers = append(headers, sarama.RecordHeader{
Key: []byte(HeaderParentID),
Value: []byte(parent),
})
}

timestamp, has := types.ParentTimestampFromContext(produce.Ctx)
timestamp, has := metadata.ParentTimestampFromContext(produce.Ctx())
if has {
headers = append(headers, sarama.RecordHeader{
Key: []byte(HeaderParentTimestamp),
Value: []byte(strconv.Itoa(int(time.Time(timestamp).UnixNano()))),
})
}

kv, has := types.HeaderFromContext(produce.Ctx)
kv, has := metadata.HeaderFromContext(produce.Ctx())
if has {
for key, value := range kv {
headers = append(headers, sarama.RecordHeader{
Expand Down
2 changes: 1 addition & 1 deletion dialects/mock/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (consumer *Consumer) Emit(message *types.Message) {
for _, subscription := range collection.list {
message.Reset()
subscription.messages <- message
message.Await()
message.Finally()
}
collection.mutex.Unlock()
close(resolved)
Expand Down
5 changes: 3 additions & 2 deletions dialects/mock/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ func TestConsumerConsumption(t *testing.T) {
topic := types.NewTopic("mock", dialect, types.EventMessage, types.DefaultMode)
message := types.Message{
Topic: topic,
Ctx: context.Background(),
}

message.NewCtx(context.Background())

sink := make(chan bool)
messages, err := dialect.Consumer().Subscribe(topic)
if err != nil {
Expand All @@ -25,7 +26,7 @@ func TestConsumerConsumption(t *testing.T) {

go func() {
for message := range messages {
message.Next()
message.Ack()
close(sink)
}
}()
Expand Down
7 changes: 1 addition & 6 deletions dialects/mock/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@ type Dialect struct {

// Open notifies a dialect to open the dialect.
// No further topic assignments should be made.
func (dialect *Dialect) Open() error {
func (dialect *Dialect) Open([]types.Topic) error {
return nil
}

// Assigned notifies a dialect about the assignment of the given topic
func (dialect *Dialect) Assigned(types.Topic) {
// ignore...
}

// Consumer returns the dialect consumer
func (dialect *Dialect) Consumer() types.Consumer {
return dialect.consumer
Expand Down
2 changes: 1 addition & 1 deletion dialects/mock/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "testing"
func TestNewDialectConstruction(t *testing.T) {
dialect := NewDialect()

dialect.Open()
dialect.Open(nil)

if dialect.Consumer() == nil {
t.Fatal("no dialect consumer")
Expand Down
2 changes: 1 addition & 1 deletion dialects/mock/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ func TestProducerProduction(t *testing.T) {
topic := types.NewTopic("mock", dialect, types.EventMessage, types.DefaultMode)
message := types.Message{
Topic: topic,
Ctx: context.Background(),
}

message.NewCtx(context.Background())
err := dialect.Producer().Publish(&message)
if err != nil {
t.Fatal(err)
Expand Down
42 changes: 42 additions & 0 deletions examples/kafka/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ github.com/Shopify/sarama v1.22.1 h1:exyEsKLGyCsDiqpV5Lr4slFi8ev2KiM3cP1KZ6vnCQ0
github.com/Shopify/sarama v1.22.1/go.mod h1:FRzlvRpMFO/639zY1SDxUxkqH97Y0ndM5CbGj6oG3As=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/ThreeDotsLabs/watermill v1.0.0/go.mod h1:gjVFKc8aN+vmEHw3pA0kh4mmuwHe02nyfghb6IWWiKE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -20,18 +26,28 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jeroenrinzema/commander/dialects/kafka v0.0.0-20190406211118-d65d4aaec57d/go.mod h1:p8F8ymzu3CGQA/eVQeb9StOUMmOzK0M9a2f/dBU9dck=
github.com/jeroenrinzema/commander/examples/kafka v0.0.0-20190430202912-8fe70c0e1b5a/go.mod h1:rOw7O00SkE/3oR77184Etqs7THPiokOaVrub4mSzePA=
Expand All @@ -41,20 +57,40 @@ github.com/jeroenrinzema/commander/examples/mock v0.0.0-20190613124800-6c8bc78e3
github.com/jeroenrinzema/commander/examples/mock-multiple-groups v0.0.0-20190613124800-6c8bc78e3138/go.mod h1:0DeSCXOO2GXIi4n+FUWGS6o/pCI/ZBpsKebVuW6hIj8=
github.com/jeroenrinzema/commander/examples/streaming v0.0.0-20190702094603-24861dd4d416/go.mod h1:7463oz3hsyxpJu+n9qvoXo63DISG29e8OXbSD/Jpx10=
github.com/jeroenrinzema/commander/examples/zipkin v0.0.0-20190613124800-6c8bc78e3138/go.mod h1:0kH96eT7K8FRLOQj/93XMym10lFwlLKJD9MSthr90z0=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/renstrom/shortuuid v3.0.0+incompatible/go.mod h1:n18Ycpn8DijG+h/lLBQVnGKv1BCtTeXo8KKSbBOrQ8c=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -64,6 +100,7 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand All @@ -75,18 +112,23 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
Expand Down
4 changes: 2 additions & 2 deletions examples/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
key, err := uuid.NewV4()
if err != nil {
// Mark the message to be retried, this will reset the offset of the message topic, parition to the original message offset
message.Retry(err)
message.Nack()
return
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func main() {
return
}

event.Next()
event.Ack()

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(event)
Expand Down
Loading

0 comments on commit 4b24706

Please sign in to comment.