Skip to content

Commit

Permalink
Merge pull request #23 from jpmorganchase/develop-enhancements-202309
Browse files Browse the repository at this point in the history
Develop enhancements 202309
  • Loading branch information
robertsdrm authored Nov 19, 2023
2 parents da8f957 + 8af407a commit 81e015d
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 92 deletions.
193 changes: 163 additions & 30 deletions fusion/fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
upload_files,
validate_file_names,
is_dataset_raw,
tqdm_joblib,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,6 +114,7 @@ def __init__(

self.session = get_session(self.credentials, self.root_url)
self.fs = fs if fs else get_default_fs()
self.events = None

def __repr__(self):
"""Object representation to list all available methods."""
Expand Down Expand Up @@ -337,8 +339,11 @@ def list_datasets(
if product:
url = f"{self.root_url}catalogs/{catalog}/productDatasets"
df_pr = Fusion._call_for_dataframe(url, self.session)
df_pr = df_pr[df_pr["product"] == product] if isinstance(product, str) \
df_pr = (
df_pr[df_pr["product"] == product]
if isinstance(product, str)
else df_pr[df_pr["product"].isin(product)]
)
df = df[df["identifier"].isin(df_pr["dataset"])].reset_index(drop=True)

if max_results > -1:
Expand All @@ -348,15 +353,15 @@ def list_datasets(
df["region"] = df.region.str.join(", ")
if not display_all_columns:
cols = [
"identifier",
"title",
"containerType",
"region",
"category",
"coverageStartDate",
"coverageEndDate",
"description",
]
"identifier",
"title",
"containerType",
"region",
"category",
"coverageStartDate",
"coverageEndDate",
"description",
]
cols = [c for c in cols if c in df.columns]
df = df[cols]

Expand Down Expand Up @@ -532,6 +537,10 @@ def _resolve_distro_tuples(
catalog = self.__use_catalog(catalog)

datasetseries_list = self.list_datasetmembers(dataset, catalog)
if len(datasetseries_list) == 0:
raise AssertionError(
f"There are no dataset members for dataset {dataset} in catalog {catalog}"
)

if datasetseries_list.empty:
raise APIResponseError(
Expand All @@ -558,6 +567,12 @@ def _resolve_distro_tuples(
pd.to_datetime(datasetseries_list["identifier"]) <= parsed_dates[1]
]

if len(datasetseries_list) == 0:
raise APIResponseError(
f"No data available for dataset {dataset} in catalog {catalog}.\n"
f"Check that a valid dataset identifier and date/date range has been set."
)

required_series = list(datasetseries_list["@id"])
tups = [
(catalog, dataset, series, dataset_format) for series in required_series
Expand Down Expand Up @@ -652,17 +667,22 @@ def download(
for i, series in enumerate(required_series)
]

if show_progress:
loop = tqdm(download_spec)
else:
loop = download_spec
logger.log(
VERBOSE_LVL,
f"Beginning {len(loop)} downloads in batches of {n_par}",
)
res = Parallel(n_jobs=n_par)(
delayed(stream_single_file_new_session)(**spec) for spec in loop
f"Beginning {len(download_spec)} downloads in batches of {n_par}",
)
if show_progress:
with tqdm_joblib(tqdm(total=len(download_spec))) as _:
res = Parallel(n_jobs=n_par)(
delayed(stream_single_file_new_session)(**spec)
for spec in download_spec
)
else:
res = Parallel(n_jobs=n_par)(
delayed(stream_single_file_new_session)(**spec)
for spec in download_spec
)

if (len(res) > 0) and (not all((r[0] for r in res))):
for r in res:
if not r[0]:
Expand All @@ -681,6 +701,7 @@ def to_df(
filters: List = None,
force_download: bool = False,
download_folder: str = None,
dataframe_type: str = "pandas",
**kwargs,
) -> pd.DataFrame:
"""Gets distributions for a specified date or date range and returns the data as a dataframe.
Expand Down Expand Up @@ -708,6 +729,7 @@ def to_df(
if it is already on disk. Defaults to False.
download_folder (str, optional): The path, absolute or relative, where downloaded files are saved.
Defaults to download_folder as set in __init__
dataframe_type (str, optional): Type
Returns:
class:`pandas.DataFrame`: a dataframe containing the requested data.
If multiple dataset instances are retrieved then these are concatenated first.
Expand Down Expand Up @@ -751,10 +773,30 @@ def to_df(
}

pd_read_default_kwargs: Dict[str, Dict[str, object]] = {
"csv": {"columns": columns, "filters": filters, "fs": self.fs},
"parquet": {"columns": columns, "filters": filters, "fs": self.fs},
"json": {"columns": columns, "filters": filters, "fs": self.fs},
"raw": {"columns": columns, "filters": filters, "fs": self.fs},
"csv": {
"columns": columns,
"filters": filters,
"fs": self.fs,
"dataframe_type": dataframe_type,
},
"parquet": {
"columns": columns,
"filters": filters,
"fs": self.fs,
"dataframe_type": dataframe_type,
},
"json": {
"columns": columns,
"filters": filters,
"fs": self.fs,
"dataframe_type": dataframe_type,
},
"raw": {
"columns": columns,
"filters": filters,
"fs": self.fs,
"dataframe_type": dataframe_type,
},
}

pd_read_default_kwargs["parq"] = pd_read_default_kwargs["parquet"]
Expand Down Expand Up @@ -785,7 +827,12 @@ def to_df(
df = pd.concat(dataframes, ignore_index=True)
else:
dataframes = (pd_reader(f, **pd_read_kwargs) for f in files) # type: ignore
df = pd.concat(dataframes, ignore_index=True)
if dataframe_type == "pandas":
df = pd.concat(dataframes, ignore_index=True)
if dataframe_type == "polars":
import polars as pl

df = pl.concat(dataframes, how="diagonal")

return df

Expand Down Expand Up @@ -904,7 +951,7 @@ def upload(
show_progress: bool = True,
return_paths: bool = False,
multipart=True,
chunk_size=5 * 2 ** 20,
chunk_size=5 * 2**20,
):
"""Uploads the requested files/files to Fusion.
Expand Down Expand Up @@ -981,21 +1028,17 @@ def upload(
df = pd.DataFrame([file_path_lst, local_url_eqiv]).T
df.columns = ["path", "url"]

if show_progress:
loop = tqdm(df.iterrows(), total=len(df))
else:
loop = df.iterrows()

n_par = cpu_count(n_par)
parallel = True if len(df) > 1 else False
res = upload_files(
fs_fusion,
self.fs,
loop,
df.iterrows(),
parallel=parallel,
n_par=n_par,
multipart=multipart,
chunk_size=chunk_size,
show_progress=show_progress,
)

if not all(r[0] for r in res):
Expand All @@ -1005,3 +1048,93 @@ def upload(
warnings.warn(msg)

return res if return_paths else None

def listen_to_events(
self, last_event_id: str = None, catalog: str = None
) -> Union[None, pd.DataFrame]:
"""
Run server sent event listener in the background. Retrieve results by running get_events.
Args:
last_event_id (str):
catalog (str):
Returns:
Union[None, class:`pandas.DataFrame`]: If in_background is True then the function returns no output.
If in_background is set to False then pandas DataFrame is output upon keyboard termination.
"""

catalog = self.__use_catalog(catalog)
import json

import threading
import asyncio
from aiohttp_sse_client import client as sse_client
from .utils import get_client

kwargs = {}
if last_event_id:
kwargs = {"headers": {"Last-Event-ID": last_event_id}}

async def async_events():
timeout = 1e100
session = await get_client(self.credentials, timeout=timeout)
async with sse_client.EventSource(
f"{self.root_url}catalogs/{catalog}/notifications/subscribe",
session=session,
**kwargs,
) as messages:
try:
async for msg in messages:
event = json.loads(msg.data)
if self.events is None:
self.events = pd.DataFrame([event])
else:
self.events = pd.concat(
[self.events, pd.DataFrame(event)], ignore_index=True
)
except TimeoutError:
raise TimeoutError
except Exception as e:
raise Exception(e)

th = threading.Thread(target=asyncio.run, args=(async_events(),), daemon=True)
th.start()

def get_events(
self, last_event_id: str = None, catalog: str = None, in_background: bool = True
) -> Union[None, pd.DataFrame]:
"""
Run server sent event listener and print out the new events. Keyboard terminate to stop.
Args:
last_event_id (str):
catalog (str):
in_background (bool):
Returns:
Union[None, class:`pandas.DataFrame`]: If in_background is True then the function returns no output.
If in_background is set to False then pandas DataFrame is output upon keyboard termination.
"""

catalog = self.__use_catalog(catalog)
if not in_background:
from sseclient import SSEClient
import json

messages = SSEClient(
session=self.session,
url=f"{self.root_url}catalogs/{catalog}/notifications/subscribe",
last_id=last_event_id,
)
lst = []
try:
for msg in messages:
event = json.loads(msg.data)
print(event)
if event["type"] != "HeartBeatNotification":
lst.append(event)
except KeyboardInterrupt:
return pd.DataFrame(lst)
except Exception as e:
raise Exception(e)
else:
return self.events
21 changes: 15 additions & 6 deletions fusion/fusion_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,23 @@ async def _ls_real(self, url, detail=True, **kwargs):
session = await self.set_session()
is_file = False
size = None
async with session.get(url, **self.kwargs) as r:
async with session.head(url + "/operationType/download", **self.kwargs) as r:
self._raise_not_found_for_status(r, url)
try:
out = await r.json()
except Exception as ex:
logger.log(VERBOSE_LVL, f"{url} cannot be parsed to json, {ex}")
# out = await r.content.read(10)
out = [r.headers["Content-Disposition"].split("=")[-1]]
out = [
url.split("/")[6]
+ "-"
+ url.split("/")[8]
+ "-"
+ url.split("/")[10]
+ "."
+ url.split("/")[-1]
]

size = int(r.headers["Content-Length"])
is_file = True

Expand Down Expand Up @@ -244,7 +253,7 @@ def cat(self, url, start=None, end=None, **kwargs):
return super().cat(url, start=start, end=end, **kwargs)

def get(
self, rpath, lpath, chunk_size=5 * 2 ** 20, callback=_DEFAULT_CALLBACK, **kwargs
self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs
):
"""Copy file(s) to local.
Expand All @@ -267,7 +276,7 @@ async def _put_file(
self,
lpath,
rpath,
chunk_size=5 * 2 ** 20,
chunk_size=5 * 2**20,
callback=_DEFAULT_CALLBACK,
method="post",
multipart=False,
Expand Down Expand Up @@ -343,7 +352,7 @@ async def put_data():
)

@staticmethod
def _construct_headers(file_local, dt_iso, chunk_size=5 * 2 ** 20, multipart=False):
def _construct_headers(file_local, dt_iso, chunk_size=5 * 2**20, multipart=False):

headers = {
"Content-Type": "application/octet-stream",
Expand Down Expand Up @@ -385,7 +394,7 @@ def put(
self,
lpath,
rpath,
chunk_size=5 * 2 ** 20,
chunk_size=5 * 2**20,
callback=_DEFAULT_CALLBACK,
method="put",
multipart=False,
Expand Down
Loading

0 comments on commit 81e015d

Please sign in to comment.