Skip to content

Commit

Permalink
Fix issues (#30)
Browse files Browse the repository at this point in the history
* Use the correct error handing function (Fixes #28)
* Add kafka test to stop coverage flip flop
  • Loading branch information
nrwiersma authored Jan 8, 2019
1 parent d34e2f8 commit 3c8ced8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
36 changes: 36 additions & 0 deletions kafka/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,42 @@ func TestSource_Consume(t *testing.T) {
assert.Equal(t, []byte("foo"), msg.Value)
}

func TestSource_ConsumeError(t *testing.T) {
broker0 := sarama.NewMockBroker(t, 0)
defer broker0.Close()
broker0.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("test_topic", 0, broker0.BrokerID()),
"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0),
"JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{
Version: 1,
Err: sarama.ErrNoError,
GroupProtocol: "protocol",
}),
"SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{
Err: sarama.ErrBrokerNotAvailable,
MemberAssignment: []byte{},
}),
"LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{
Err: sarama.ErrNoError,
}),
})
c := kafka.NewSourceConfig()
c.Brokers = []string{broker0.Addr()}
c.Topic = "test_topic"
c.GroupID = "test_group"
s, _ := kafka.NewSource(c)
defer s.Close()

time.Sleep(500 * time.Millisecond)

_, err := s.Consume()

assert.Error(t, err)
}

func TestSource_Commit(t *testing.T) {
broker0 := sarama.NewMockBroker(t, 0)
defer broker0.Close()
Expand Down
3 changes: 2 additions & 1 deletion task.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task {
t := &streamTask{
topology: topology,
store: store,
errorFn: func(_ error) {},
supervisorOpts: supervisorOpts{
Strategy: Lossless,
Interval: 0,
Expand All @@ -74,7 +75,7 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task {

t.supervisor = NewSupervisor(t.store, t.supervisorOpts.Strategy)
if t.supervisorOpts.Interval > 0 {
t.supervisor = NewTimedSupervisor(t.supervisor, t.supervisorOpts.Interval, t.errorFn)
t.supervisor = NewTimedSupervisor(t.supervisor, t.supervisorOpts.Interval, t.handleError)
}

return t
Expand Down

0 comments on commit 3c8ced8

Please sign in to comment.