Skip to content

Commit

Permalink
ML-521: Substitute EmitSource parsed time field in body. (#220)
Browse files Browse the repository at this point in the history
Co-authored-by: Gal Topper <[email protected]>
  • Loading branch information
Gal Topper and Gal Topper authored May 25, 2021
1 parent cd3106b commit 66fc7cf
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
1 change: 1 addition & 0 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _build_event(self, element, key, event_time):
key.append(body[field])
if not event_time and self._time_field:
event_time = _convert_to_datetime(body[self._time_field], self._time_format)
body[self._time_field] = event_time

if element_is_event:
if key:
Expand Down
31 changes: 31 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,37 @@ def test_metadata_fields():
assert result2.body == body2


def test_time_parsed_on_emit():
controller = build_flow([
SyncEmitSource(key_field='mykey', time_field='mytime'),
Reduce([], append_and_return, full_event=True)
]).run()

timestamp_str = '2016-05-30 13:30:00.057'
timestamp_datetime = datetime(2016, 5, 30, 13, 30, 0, 57000)
body1 = {'mykey': 'k1', 'mytime': timestamp_str, 'otherfield': 'x'}
expected_body1 = {'mykey': 'k1', 'mytime': timestamp_datetime, 'otherfield': 'x'}
body2 = {'mykey': 'k2', 'otherfield': 'x'}

controller.emit(body1)
controller.emit(body2, event_time=timestamp_datetime)

controller.terminate()
result = controller.await_termination()

assert len(result) == 2

result1 = result[0]
assert result1.key == 'k1'
assert result1.time == timestamp_datetime
assert result1.body == expected_body1

result2 = result[1]
assert result2.key == 'k2'
assert result2.time == timestamp_datetime
assert result2.body == body2


async def async_test_async_metadata_fields():
controller = await build_flow([
AsyncEmitSource(key_field='mykey', time_field='mytime'),
Expand Down

0 comments on commit 66fc7cf

Please sign in to comment.