Skip to content

Commit

Permalink
Added _fetch_data to handle fetching errors
Browse files Browse the repository at this point in the history
  • Loading branch information
danyi1212 committed Feb 2, 2025
1 parent 5bee848 commit 0abcc3d
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,8 @@ async def _fetch_and_save_data(
Flow:
1. Attempt to fetch data via the data fetcher (e.g., HTTP).
2. Handle any fetch errors.
3. If data is fetched successfully, store it in the policy store.
4. Return a DataEntryReport indicating success/failure of each step.
2. If data is fetched successfully, store it in the policy store.
3. Return a DataEntryReport indicating success/failure of each step.
Args:
entry (DataSourceEntry): The configuration details of the data source entry.
Expand All @@ -535,6 +534,44 @@ async def _fetch_and_save_data(
DataEntryReport: Includes information about whether data was fetched,
saved, and the computed hash for the data if successfully saved.
"""
try:
result = await self._fetch_data(entry)
except Exception as e:
store_transaction._update_remote_status(
url=entry.url, status=False, error=str(e)
)
return DataEntryReport(entry=entry, fetched=False, saved=False)

try:
await self._store_fetched_data(entry, result, store_transaction)
except Exception as e:
logger.exception("Failed to save data update to policy-store: {exc}", exc=e)
store_transaction._update_remote_status(
url=entry.url,
status=False,
error=f"Failed to save data to policy store: {e}",
)
return DataEntryReport(
entry=entry, hash=self.calc_hash(result), fetched=True, saved=False
)
else:
store_transaction._update_remote_status(
url=entry.url, status=True, error=""
)
return DataEntryReport(
entry=entry, hash=self.calc_hash(result), fetched=True, saved=True
)

async def _fetch_data(self, entry: DataSourceEntry) -> JsonableValue:
"""Fetches data from a data source using the configured data fetcher.
Handles fetch errors, HTTP errors, and empty responses.
Args:
entry (DataSourceEntry): The configuration specifying how and where to fetch data.
Returns:
JsonableValue: The fetched data, as a JSON-serializable object.
"""
try:
result = await self._data_fetcher.handle_url(
url=entry.url,
Expand All @@ -547,16 +584,10 @@ async def _fetch_and_save_data(
entry=entry,
exc=e,
)
store_transaction._update_remote_status(
url=entry.url, status=False, error=f"Failed to fetch data: {e}"
)
return DataEntryReport(entry=entry, fetched=False, saved=False)
raise Exception(f"Failed to fetch data for entry {entry.url}: {e}")

if result is None:
store_transaction._update_remote_status(
url=entry.url, status=False, error="Fetched data is empty"
)
return DataEntryReport(entry=entry, fetched=False, saved=False)
raise Exception(f"Fetched data is empty for entry {entry.url}")

if isinstance(result, aiohttp.ClientResponse) and is_http_error_response(
result
Expand All @@ -568,29 +599,11 @@ async def _fetch_and_save_data(
status=result.status,
error=error_content,
)
store_transaction._update_remote_status(
url=entry.url, status=False, error=str(error_content)
raise Exception(
f"Failed to decode response from url: '{entry.url}', got response code {result.status} with response: {error_content}"
)
return DataEntryReport(entry=entry, fetched=False, saved=False)

try:
await self._store_fetched_data(entry, result, store_transaction)
store_transaction._update_remote_status(
url=entry.url, status=True, error=""
)
return DataEntryReport(
entry=entry, hash=self.calc_hash(result), fetched=True, saved=True
)
except Exception as e:
logger.exception("Failed to save data update to policy-store: {exc}", exc=e)
store_transaction._update_remote_status(
url=entry.url,
status=False,
error=f"Failed to save data to policy store: {e}",
)
return DataEntryReport(
entry=entry, hash=self.calc_hash(result), fetched=True, saved=False
)
return result

async def _store_fetched_data(
self,
Expand Down

0 comments on commit 0abcc3d

Please sign in to comment.