Skip to content

Commit

Permalink
#581-refactor-too-long-function: improve function readbility
Browse files Browse the repository at this point in the history
  • Loading branch information
higa-daniel committed Nov 18, 2024
1 parent 10644a0 commit 1b348cb
Showing 1 changed file with 27 additions and 112 deletions.
139 changes: 27 additions & 112 deletions tinydb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,131 +383,46 @@ def update(
:returns: a list containing the updated document's ID
"""

# Define the function that will perform the update
if callable(fields):
def perform_update(table, doc_id):
# Update documents by calling the update function provided by
# the user
fields(table[doc_id])
else:
def perform_update(table, doc_id):
# Update documents by setting all fields from the provided data
table[doc_id].update(fields)

if doc_ids is not None:
# Perform the update operation for documents specified by a list
# of document IDs

updated_ids = list(doc_ids)

def updater(table: dict):
# Call the processing callback with all document IDs
for doc_id in updated_ids:
perform_update(table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

return updated_ids

elif cond is not None:
# Perform the update operation for documents specified by a query
# Define the function to perform the update based on `fields`
def perform_update(table, doc_id):
if callable(fields):
fields(table[doc_id]) # Call user-defined function
else:
table[doc_id].update(fields) # Update with provided fields

# Collect affected doc_ids
# Helper to process document IDs
def process_doc_ids(table, ids):
updated_ids = []

def updater(table: dict):
_cond = cast(QueryLike, cond)

# We need to convert the keys iterator to a list because
# we may remove entries from the ``table`` dict during
# iteration and doing this without the list conversion would
# result in an exception (RuntimeError: dictionary changed size
# during iteration)
for doc_id in list(table.keys()):
# Pass through all documents to find documents matching the
# query. Call the processing callback with the document ID
if _cond(table[doc_id]):
# Add ID to list of updated documents
updated_ids.append(doc_id)

# Perform the update (see above)
perform_update(table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

for doc_id in ids:
updated_ids.append(doc_id)
perform_update(table, doc_id)
return updated_ids

else:
# Update all documents unconditionally

# Update based on conditions
def process_condition(table, condition):
updated_ids = []

def updater(table: dict):
# Process all documents
for doc_id in list(table.keys()):
# Add ID to list of updated documents
for doc_id, doc in list(table.items()):
if condition(doc):
updated_ids.append(doc_id)

# Perform the update (see above)
perform_update(table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

return updated_ids

def update_multiple(
self,
updates: Iterable[
Tuple[Union[Mapping, Callable[[Mapping], None]], QueryLike]
],
) -> List[int]:
"""
Update all matching documents to have a given set of fields.
:returns: a list containing the updated document's ID
"""

# Define the function that will perform the update
def perform_update(fields, table, doc_id):
if callable(fields):
# Update documents by calling the update function provided
# by the user
fields(table[doc_id])
else:
# Update documents by setting all fields from the provided
# data
table[doc_id].update(fields)
# Select the appropriate update logic
def update_logic(table):
if doc_ids is not None:
return process_doc_ids(table, doc_ids)
if cond is not None:
return process_condition(table, cond)
# Update all documents if no condition or IDs provided
return process_doc_ids(table, list(table.keys()))

# Perform the update operation for documents specified by a query

# Collect affected doc_ids
# Perform the update operation (see `_update_table` for details)
updated_ids = []
def updater(table):
nonlocal updated_ids
updated_ids = update_logic(table)

def updater(table: dict):
# We need to convert the keys iterator to a list because
# we may remove entries from the ``table`` dict during
# iteration and doing this without the list conversion would
# result in an exception (RuntimeError: dictionary changed size
# during iteration)
for doc_id in list(table.keys()):
for fields, cond in updates:
_cond = cast(QueryLike, cond)

# Pass through all documents to find documents matching the
# query. Call the processing callback with the document ID
if _cond(table[doc_id]):
# Add ID to list of updated documents
updated_ids.append(doc_id)

# Perform the update (see above)
perform_update(fields, table, doc_id)

# Perform the update operation (see _update_table for details)
self._update_table(updater)

return updated_ids

def upsert(self, document: Mapping, cond: Optional[QueryLike] = None) -> List[int]:
Expand Down

0 comments on commit 1b348cb

Please sign in to comment.