From 3c8ced8ea8507868612a37bac417b7831f20b2de Mon Sep 17 00:00:00 2001 From: Nicholas Wiersma Date: Tue, 8 Jan 2019 12:09:47 +0200 Subject: [PATCH] Fix issues (#30) * Use the correct error handing function (Fixes #28) * Add kafka test to stop coverage flip flop --- kafka/source_test.go | 36 ++++++++++++++++++++++++++++++++++++ task.go | 3 ++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/kafka/source_test.go b/kafka/source_test.go index 5d43f67..711e6b3 100644 --- a/kafka/source_test.go +++ b/kafka/source_test.go @@ -292,6 +292,42 @@ func TestSource_Consume(t *testing.T) { assert.Equal(t, []byte("foo"), msg.Value) } +func TestSource_ConsumeError(t *testing.T) { + broker0 := sarama.NewMockBroker(t, 0) + defer broker0.Close() + broker0.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("test_topic", 0, broker0.BrokerID()), + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), + "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ + Version: 1, + Err: sarama.ErrNoError, + GroupProtocol: "protocol", + }), + "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ + Err: sarama.ErrBrokerNotAvailable, + MemberAssignment: []byte{}, + }), + "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ + Err: sarama.ErrNoError, + }), + }) + c := kafka.NewSourceConfig() + c.Brokers = []string{broker0.Addr()} + c.Topic = "test_topic" + c.GroupID = "test_group" + s, _ := kafka.NewSource(c) + defer s.Close() + + time.Sleep(500 * time.Millisecond) + + _, err := s.Consume() + + assert.Error(t, err) +} + func TestSource_Commit(t *testing.T) { broker0 := sarama.NewMockBroker(t, 0) defer broker0.Close() diff --git a/task.go b/task.go index d360073..f74b95d 100644 --- a/task.go +++ b/task.go @@ -60,6 +60,7 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task { t := &streamTask{ topology: topology, store: store, + errorFn: func(_ error) {}, supervisorOpts: supervisorOpts{ Strategy: Lossless, Interval: 0, @@ -74,7 +75,7 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task { t.supervisor = NewSupervisor(t.store, t.supervisorOpts.Strategy) if t.supervisorOpts.Interval > 0 { - t.supervisor = NewTimedSupervisor(t.supervisor, t.supervisorOpts.Interval, t.errorFn) + t.supervisor = NewTimedSupervisor(t.supervisor, t.supervisorOpts.Interval, t.handleError) } return t