Skip to content

Commit

Permalink
iterate over a copy of changed keys (#198)
Browse files Browse the repository at this point in the history
* iterate over a copy of changed keys

* don't add key to persist job if it's already pending

* fix several bugs

Co-authored-by: Dina Nimrodi <[email protected]>
  • Loading branch information
dinal and Dina Nimrodi authored Apr 14, 2021
1 parent 6aefc07 commit 851e5df
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 39 deletions.
2 changes: 1 addition & 1 deletion storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ async def _call(self, event):
else:
key_data = self._state[event.key]
res, new_state = self._fn(element, key_data)
self._state._set_static_attrs(event.key, new_state)
self._state._update_static_attrs(event.key, new_state)
self._state._init_flush_task()
else:
res, self._state = self._fn(element, self._state)
Expand Down
61 changes: 23 additions & 38 deletions storey/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,11 @@ async def _flush_worker(self):
try:
while not self._terminated:
await asyncio.sleep(self._flush_interval_secs)
for key in self._changed_keys:
await self._persist(_PersistJob(key, None, None))
for key in self._changed_keys.copy():
if key not in self._pending_by_key:
await self._persist(_PersistJob(key, None, None))
self._changed_keys.discard(key)

except BaseException as ex:
if not isinstance(ex, asyncio.CancelledError):
self._flush_exception = ex
Expand All @@ -302,41 +305,32 @@ async def _persist_worker(self):
if pending_event.pending and not pending_event.in_flight:
for job in pending_event.pending:
resp = await self._internal_persist_key(key, job.data)
self._changed_keys.discard(key)
if job.callback:
await job.callback(job.extra_data, resp)
break

job = task[0]
completed = await task[1]

if isinstance(job.key, list):
job_key = str(job.key)
else:
job_key = job.key

for done_job in self._pending_by_key[job_key].in_flight:
if len(self._pending_by_key[job_key].pending) == 0:
self._changed_keys.discard(job_key)
for done_job in self._pending_by_key[job.key].in_flight:
if done_job.callback:
await done_job.callback(done_job.extra_data, completed)
self._pending_by_key[job_key].in_flight = []
self._pending_by_key[job.key].in_flight = []

# If we got more pending events for the same key process them
if self._pending_by_key[job_key].pending:
self._pending_by_key[job_key].in_flight = self._pending_by_key[job_key].pending
self._pending_by_key[job_key].pending = []
if self._pending_by_key[job.key].pending:
self._pending_by_key[job.key].in_flight = self._pending_by_key[job.key].pending
self._pending_by_key[job.key].pending = []

future_task = self._safe_process_events(self._pending_by_key[job_key].in_flight)
future_task = self._safe_process_events(self._pending_by_key[job.key].in_flight)
tail_position = received_job_count + self._q.qsize()
jobs_at_tail = self_sent_jobs.get(tail_position, [])
jobs_at_tail.append((job, asyncio.get_running_loop().create_task(future_task)))
self_sent_jobs[tail_position] = jobs_at_tail
else:
del self._pending_by_key[job_key]
del self._pending_by_key[job.key]
except BaseException as ex:
if task and task is not _termination_obj:
self._changed_keys.discard(task[0].key)
if task[0].extra_data and task[0].extra_data._awaitable_result:
task[0].extra_data._awaitable_result._set_error(ex)
if not self._q.empty():
Expand All @@ -350,8 +344,9 @@ async def _terminate(self):
raise self._flush_exception
if not self._terminated:
self._terminated = True
for key in self._changed_keys:
await self._persist(_PersistJob(key, None, None), from_terminate=True)
for key in self._changed_keys.copy():
if key not in self._pending_by_key:
await self._persist(_PersistJob(key, None, None), from_terminate=True)
await self._q.put(_termination_obj)
await self._worker_awaitable
self._q = None
Expand All @@ -373,20 +368,15 @@ async def _persist(self, job, from_terminate=False):
raise FlowError("Persist worker has already terminated")
else:
# Initializing the key with 2 lists. One for pending requests and one for requests that an update request has been issued for.
if isinstance(job.key, list):
# list can't be key in a dictionary
job_key = str(job.key)
else:
job_key = job.key
if job_key not in self._pending_by_key:
self._pending_by_key[job_key] = _PendingEvent()
if job.key not in self._pending_by_key:
self._pending_by_key[job.key] = _PendingEvent()

# If there is a current update in flight for the key, add the event to the pending list. Otherwise update the key.
self._pending_by_key[job_key].pending.append(job)
if len(self._pending_by_key[job_key].in_flight) == 0:
self._pending_by_key[job_key].in_flight = self._pending_by_key[job_key].pending
self._pending_by_key[job_key].pending = []
task = self._safe_process_events(self._pending_by_key[job_key].in_flight)
self._pending_by_key[job.key].pending.append(job)
if len(self._pending_by_key[job.key].in_flight) == 0:
self._pending_by_key[job.key].in_flight = self._pending_by_key[job.key].pending
self._pending_by_key[job.key].pending = []
task = self._safe_process_events(self._pending_by_key[job.key].in_flight)
await self._q.put((job, asyncio.get_running_loop().create_task(task)))
if self._worker_awaitable.done():
await self._worker_awaitable
Expand All @@ -396,12 +386,7 @@ async def _safe_process_events(self, jobs):
# TODO using only last event might not work correctly if
# there are different non aggregation attrs in each event
job = jobs[-1]
if isinstance(job.key, list):
# list can't be key in a dictionary
job_key = str(job.key)
else:
job_key = job.key
return await self._internal_persist_key(job_key, job.data)
return await self._internal_persist_key(job.key, job.data)
except BaseException as ex:
for job in jobs:
if job.extra_data and job.extra_data._awaitable_result:
Expand Down

0 comments on commit 851e5df

Please sign in to comment.