Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#581-refactor-too-long-function: improve function readability and decrease cognitive complexity #582

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 27 additions & 112 deletions tinydb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,131 +383,46 @@ def update(
:returns: a list containing the updated document's ID
"""

# Define the function that will perform the update
if callable(fields):
def perform_update(table, doc_id):
# Update documents by calling the update function provided by
# the user
fields(table[doc_id])
else:
def perform_update(table, doc_id):
# Update documents by setting all fields from the provided data
table[doc_id].update(fields)

if doc_ids is not None:
# Perform the update operation for documents specified by a list
# of document IDs

updated_ids = list(doc_ids)

def updater(table: dict):
# Call the processing callback with all document IDs
for doc_id in updated_ids:
perform_update(table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

return updated_ids

elif cond is not None:
# Perform the update operation for documents specified by a query
# Define the function to perform the update based on `fields`
def perform_update(table, doc_id):
if callable(fields):
fields(table[doc_id]) # Call user-defined function
else:
table[doc_id].update(fields) # Update with provided fields

# Collect affected doc_ids
# Helper to process document IDs
def process_doc_ids(table, ids):
updated_ids = []

def updater(table: dict):
_cond = cast(QueryLike, cond)

# We need to convert the keys iterator to a list because
# we may remove entries from the ``table`` dict during
# iteration and doing this without the list conversion would
# result in an exception (RuntimeError: dictionary changed size
# during iteration)
for doc_id in list(table.keys()):
# Pass through all documents to find documents matching the
# query. Call the processing callback with the document ID
if _cond(table[doc_id]):
# Add ID to list of updated documents
updated_ids.append(doc_id)

# Perform the update (see above)
perform_update(table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

for doc_id in ids:
updated_ids.append(doc_id)
perform_update(table, doc_id)
return updated_ids

else:
# Update all documents unconditionally

# Update based on conditions
def process_condition(table, condition):
updated_ids = []

def updater(table: dict):
# Process all documents
for doc_id in list(table.keys()):
# Add ID to list of updated documents
for doc_id, doc in list(table.items()):
if condition(doc):
updated_ids.append(doc_id)

# Perform the update (see above)
perform_update(table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

return updated_ids

def update_multiple(
self,
updates: Iterable[
Tuple[Union[Mapping, Callable[[Mapping], None]], QueryLike]
],
) -> List[int]:
"""
Update all matching documents to have a given set of fields.

:returns: a list containing the updated document's ID
"""

# Define the function that will perform the update
def perform_update(fields, table, doc_id):
if callable(fields):
# Update documents by calling the update function provided
# by the user
fields(table[doc_id])
else:
# Update documents by setting all fields from the provided
# data
table[doc_id].update(fields)
# Select the appropriate update logic
def update_logic(table):
if doc_ids is not None:
return process_doc_ids(table, doc_ids)
if cond is not None:
return process_condition(table, cond)
# Update all documents if no condition or IDs provided
return process_doc_ids(table, list(table.keys()))

# Perform the update operation for documents specified by a query

# Collect affected doc_ids
# Perform the update operation (see `_update_table` for details)
updated_ids = []
def updater(table):
nonlocal updated_ids
updated_ids = update_logic(table)

def updater(table: dict):
# We need to convert the keys iterator to a list because
# we may remove entries from the ``table`` dict during
# iteration and doing this without the list conversion would
# result in an exception (RuntimeError: dictionary changed size
# during iteration)
for doc_id in list(table.keys()):
for fields, cond in updates:
_cond = cast(QueryLike, cond)

# Pass through all documents to find documents matching the
# query. Call the processing callback with the document ID
if _cond(table[doc_id]):
# Add ID to list of updated documents
updated_ids.append(doc_id)

# Perform the update (see above)
perform_update(fields, table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

return updated_ids

def upsert(self, document: Mapping, cond: Optional[QueryLike] = None) -> List[int]:
Expand Down
Loading