diff --git a/storey/sources.py b/storey/sources.py index 116ca6fe..95aba149 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -88,6 +88,7 @@ def _build_event(self, element, key, event_time): key.append(body[field]) if not event_time and self._time_field: event_time = _convert_to_datetime(body[self._time_field], self._time_format) + body[self._time_field] = event_time if element_is_event: if key: diff --git a/tests/test_flow.py b/tests/test_flow.py index 3ca18b14..19f84f35 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -2145,6 +2145,37 @@ def test_metadata_fields(): assert result2.body == body2 +def test_time_parsed_on_emit(): + controller = build_flow([ + SyncEmitSource(key_field='mykey', time_field='mytime'), + Reduce([], append_and_return, full_event=True) + ]).run() + + timestamp_str = '2016-05-30 13:30:00.057' + timestamp_datetime = datetime(2016, 5, 30, 13, 30, 0, 57000) + body1 = {'mykey': 'k1', 'mytime': timestamp_str, 'otherfield': 'x'} + expected_body1 = {'mykey': 'k1', 'mytime': timestamp_datetime, 'otherfield': 'x'} + body2 = {'mykey': 'k2', 'otherfield': 'x'} + + controller.emit(body1) + controller.emit(body2, event_time=timestamp_datetime) + + controller.terminate() + result = controller.await_termination() + + assert len(result) == 2 + + result1 = result[0] + assert result1.key == 'k1' + assert result1.time == timestamp_datetime + assert result1.body == expected_body1 + + result2 = result[1] + assert result2.key == 'k2' + assert result2.time == timestamp_datetime + assert result2.body == body2 + + async def async_test_async_metadata_fields(): controller = await build_flow([ AsyncEmitSource(key_field='mykey', time_field='mytime'),