diff --git a/channel/sink.go b/channel/sink.go new file mode 100644 index 0000000..f21dc98 --- /dev/null +++ b/channel/sink.go @@ -0,0 +1,34 @@ +package channel + +import "github.com/msales/streams/v2" + +// Sink represents a channel sink. +type Sink struct { + pipe streams.Pipe + + ch chan *streams.Message +} + +// NewSink creates a new channel Sink. +func NewSink(ch chan *streams.Message) *Sink { + return &Sink{ch: ch} +} + +// WithPipe sets the pipe on the Processor. +func (s *Sink) WithPipe(pipe streams.Pipe) { + s.pipe = pipe +} + +// Process processes the stream Message. +func (s *Sink) Process(msg *streams.Message) error { + s.ch <- msg + + return s.pipe.Mark(msg) +} + +// Close closes the processor. +func (s *Sink) Close() error { + close(s.ch) + + return nil +} diff --git a/channel/sink_test.go b/channel/sink_test.go new file mode 100644 index 0000000..987bed5 --- /dev/null +++ b/channel/sink_test.go @@ -0,0 +1,45 @@ +package channel_test + +import ( + "testing" + + "github.com/msales/streams/v2" + "github.com/msales/streams/v2/channel" + "github.com/msales/streams/v2/mocks" + "github.com/stretchr/testify/assert" +) + +func TestNewSink(t *testing.T) { + sink := channel.NewSink(nil) + + assert.Equal(t, &channel.Sink{}, sink) +} + +func TestSink_Close(t *testing.T) { + ch := make(chan *streams.Message) + sink := channel.NewSink(ch) + + err := sink.Close() + _, open := <-ch + + assert.NoError(t, err) + assert.False(t, open) +} + +func TestSink_Process(t *testing.T) { + ch := make(chan *streams.Message, 1) + sink := channel.NewSink(ch) + + pipe := mocks.NewPipe(t) + pipe.ExpectMark(nil, "test") + + sink.WithPipe(pipe) + + msg := &streams.Message{Value: "test"} + + err := sink.Process(msg) + + assert.NoError(t, err) + assert.Equal(t, msg, <-ch) + pipe.AssertExpectations() +} diff --git a/channel/source.go b/channel/source.go new file mode 100644 index 0000000..a5fafca --- /dev/null +++ b/channel/source.go @@ -0,0 +1,42 @@ +package channel + +import ( + "time" + + "github.com/msales/streams/v2" +) + +// Compile-time interface check. +var _ streams.Source = (*Source)(nil) + +// Source represents a source that consumes messages from a channel. +type Source struct { + ch chan *streams.Message +} + +// NewSource creates a new channel Source. +func NewSource(ch chan *streams.Message) *Source { + return &Source{ch: ch} +} + +// Consume gets the next record from the Source. +func (s *Source) Consume() (*streams.Message, error) { + select { + + case msg := <-s.ch: + return msg.WithMetadata(nil, nil), nil + + case <-time.After(100 * time.Millisecond): + return streams.NewMessage(nil, nil), nil + } +} + +// Commit marks the consumed records as processed. +func (s *Source) Commit(interface{}) error { + return nil +} + +// Close closes the Source. +func (s *Source) Close() error { + return nil +} diff --git a/channel/source_test.go b/channel/source_test.go new file mode 100644 index 0000000..0a9b50b --- /dev/null +++ b/channel/source_test.go @@ -0,0 +1,91 @@ +package channel_test + +import ( + "testing" + + "github.com/msales/streams/v2" + "github.com/msales/streams/v2/channel" + "github.com/stretchr/testify/assert" +) + +func TestNewSource(t *testing.T) { + src := channel.NewSource(nil) + + assert.Equal(t, &channel.Source{}, src) +} + +func TestSource_Consume(t *testing.T) { + msgs := make([]*streams.Message, 3) + for i := 0; i < len(msgs); i++ { + msgs[i] = streams.NewMessage(i, i).WithMetadata(mockSource{}, mockMetadata{}) + } + + ch := make(chan *streams.Message, len(msgs)) + + for _, msg := range msgs { + ch <- msg + } + + src := channel.NewSource(ch) + + for i := 0; i < len(msgs); i++ { + msg, err := src.Consume() + src, meta := msg.Metadata() + + assert.NoError(t, err) + assert.Equal(t, msgs[i].Key, msg.Key) + assert.Equal(t, msgs[i].Value, msg.Value) + assert.Nil(t, src) + assert.Nil(t, meta) + } +} + +func TestSource_Consume_WithEmptyMessage(t *testing.T) { + src := channel.NewSource(nil) + + msg, err := src.Consume() + assert.NoError(t, err) + assert.True(t, msg.Empty()) +} + +func TestSource_Commit(t *testing.T) { + src := channel.NewSource(nil) + + err := src.Commit(nil) + + assert.NoError(t, err) +} + +func TestSource_Close(t *testing.T) { + ch := make(chan *streams.Message) + src := channel.NewSource(ch) + + err := src.Close() + + assert.NoError(t, err) + assert.NotPanics(t, func() { // Assert that the ch is not closed. + close(ch) + }) +} + +type mockSource struct{} + +func (mockSource) Consume() (*streams.Message, error) { + return nil, nil +} + +func (mockSource) Commit(interface{}) error { + return nil +} + +func (mockSource) Close() error { + return nil +} + +type mockMetadata struct{} + +func (mockMetadata) WithOrigin(streams.MetadataOrigin) {} + +func (m mockMetadata) Merge(streams.Metadata, streams.MetadataStrategy) streams.Metadata { + return m +} diff --git a/go.mod b/go.mod index 5f3f073..df918aa 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect + github.com/magiconair/properties v1.8.0 github.com/msales/pkg/v3 v3.1.0 github.com/pierrec/lz4 v0.0.0-20181005164709-635575b42742 // indirect github.com/pkg/errors v0.8.0