From 851e5dfb2739c1e41dc846d1ea733300c3ba3710 Mon Sep 17 00:00:00 2001 From: Dina Nimrodi Date: Wed, 14 Apr 2021 11:15:36 +0300 Subject: [PATCH] iterate over a copy of changed keys (#198) * iterate over a copy of changed keys * don't add key to persist job if it's already pending * fix several bugs Co-authored-by: Dina Nimrodi --- storey/flow.py | 2 +- storey/table.py | 61 +++++++++++++++++++------------------------------ 2 files changed, 24 insertions(+), 39 deletions(-) diff --git a/storey/flow.py b/storey/flow.py index 8b4d6bda..32e4755e 100644 --- a/storey/flow.py +++ b/storey/flow.py @@ -400,7 +400,7 @@ async def _call(self, event): else: key_data = self._state[event.key] res, new_state = self._fn(element, key_data) - self._state._set_static_attrs(event.key, new_state) + self._state._update_static_attrs(event.key, new_state) self._state._init_flush_task() else: res, self._state = self._fn(element, self._state) diff --git a/storey/table.py b/storey/table.py index 1b137a94..0296ad1e 100644 --- a/storey/table.py +++ b/storey/table.py @@ -274,8 +274,11 @@ async def _flush_worker(self): try: while not self._terminated: await asyncio.sleep(self._flush_interval_secs) - for key in self._changed_keys: - await self._persist(_PersistJob(key, None, None)) + for key in self._changed_keys.copy(): + if key not in self._pending_by_key: + await self._persist(_PersistJob(key, None, None)) + self._changed_keys.discard(key) + except BaseException as ex: if not isinstance(ex, asyncio.CancelledError): self._flush_exception = ex @@ -302,7 +305,6 @@ async def _persist_worker(self): if pending_event.pending and not pending_event.in_flight: for job in pending_event.pending: resp = await self._internal_persist_key(key, job.data) - self._changed_keys.discard(key) if job.callback: await job.callback(job.extra_data, resp) break @@ -310,33 +312,25 @@ async def _persist_worker(self): job = task[0] completed = await task[1] - if isinstance(job.key, list): - job_key = str(job.key) - else: - job_key = job.key - - for done_job in self._pending_by_key[job_key].in_flight: - if len(self._pending_by_key[job_key].pending) == 0: - self._changed_keys.discard(job_key) + for done_job in self._pending_by_key[job.key].in_flight: if done_job.callback: await done_job.callback(done_job.extra_data, completed) - self._pending_by_key[job_key].in_flight = [] + self._pending_by_key[job.key].in_flight = [] # If we got more pending events for the same key process them - if self._pending_by_key[job_key].pending: - self._pending_by_key[job_key].in_flight = self._pending_by_key[job_key].pending - self._pending_by_key[job_key].pending = [] + if self._pending_by_key[job.key].pending: + self._pending_by_key[job.key].in_flight = self._pending_by_key[job.key].pending + self._pending_by_key[job.key].pending = [] - future_task = self._safe_process_events(self._pending_by_key[job_key].in_flight) + future_task = self._safe_process_events(self._pending_by_key[job.key].in_flight) tail_position = received_job_count + self._q.qsize() jobs_at_tail = self_sent_jobs.get(tail_position, []) jobs_at_tail.append((job, asyncio.get_running_loop().create_task(future_task))) self_sent_jobs[tail_position] = jobs_at_tail else: - del self._pending_by_key[job_key] + del self._pending_by_key[job.key] except BaseException as ex: if task and task is not _termination_obj: - self._changed_keys.discard(task[0].key) if task[0].extra_data and task[0].extra_data._awaitable_result: task[0].extra_data._awaitable_result._set_error(ex) if not self._q.empty(): @@ -350,8 +344,9 @@ async def _terminate(self): raise self._flush_exception if not self._terminated: self._terminated = True - for key in self._changed_keys: - await self._persist(_PersistJob(key, None, None), from_terminate=True) + for key in self._changed_keys.copy(): + if key not in self._pending_by_key: + await self._persist(_PersistJob(key, None, None), from_terminate=True) await self._q.put(_termination_obj) await self._worker_awaitable self._q = None @@ -373,20 +368,15 @@ async def _persist(self, job, from_terminate=False): raise FlowError("Persist worker has already terminated") else: # Initializing the key with 2 lists. One for pending requests and one for requests that an update request has been issued for. - if isinstance(job.key, list): - # list can't be key in a dictionary - job_key = str(job.key) - else: - job_key = job.key - if job_key not in self._pending_by_key: - self._pending_by_key[job_key] = _PendingEvent() + if job.key not in self._pending_by_key: + self._pending_by_key[job.key] = _PendingEvent() # If there is a current update in flight for the key, add the event to the pending list. Otherwise update the key. - self._pending_by_key[job_key].pending.append(job) - if len(self._pending_by_key[job_key].in_flight) == 0: - self._pending_by_key[job_key].in_flight = self._pending_by_key[job_key].pending - self._pending_by_key[job_key].pending = [] - task = self._safe_process_events(self._pending_by_key[job_key].in_flight) + self._pending_by_key[job.key].pending.append(job) + if len(self._pending_by_key[job.key].in_flight) == 0: + self._pending_by_key[job.key].in_flight = self._pending_by_key[job.key].pending + self._pending_by_key[job.key].pending = [] + task = self._safe_process_events(self._pending_by_key[job.key].in_flight) await self._q.put((job, asyncio.get_running_loop().create_task(task))) if self._worker_awaitable.done(): await self._worker_awaitable @@ -396,12 +386,7 @@ async def _safe_process_events(self, jobs): # TODO using only last event might not work correctly if # there are different non aggregation attrs in each event job = jobs[-1] - if isinstance(job.key, list): - # list can't be key in a dictionary - job_key = str(job.key) - else: - job_key = job.key - return await self._internal_persist_key(job_key, job.data) + return await self._internal_persist_key(job.key, job.data) except BaseException as ex: for job in jobs: if job.extra_data and job.extra_data._awaitable_result: