From da481adcc802409f191e4dfbeb193ea3d5804c79 Mon Sep 17 00:00:00 2001 From: saeedamen Date: Fri, 20 May 2022 13:49:03 +0100 Subject: [PATCH] Added more data vendor customization --- README.md | 3 ++ findatapy/market/ioengine.py | 54 +++++++++++++++++-------- findatapy/market/marketdatagenerator.py | 2 + findatapy/market/marketdatarequest.py | 8 +++- findatapy/util/dataconstants.py | 2 + setup.py | 2 +- 6 files changed, 52 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 71f7880..1cff71a 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ individual data providers) # Release Notes +* 0.1.27 - findatapy (20 May 2022) * 0.1.26 - findatapy (07 Oct 2021) * 0.1.25 - findatapy (07 Oct 2021) * 0.1.24 - findatapy (06 Oct 2021) @@ -133,6 +134,8 @@ individual data providers) # Coding log +* 20 May 2022 + * Added more customisation for data vendors * 25 Jan 2022 * Fixed delete key problem with Redis * 20 Jan 2022 diff --git a/findatapy/market/ioengine.py b/findatapy/market/ioengine.py index f54917c..410a0ba 100644 --- a/findatapy/market/ioengine.py +++ b/findatapy/market/ioengine.py @@ -17,6 +17,7 @@ import io import datetime +import json from dateutil.parser import parse import codecs @@ -316,15 +317,17 @@ def write_time_series_cache_to_disk(self, fname, data_frame, 3) if mem_float < 500: - ser = io.BytesIO() + if use_cache_compression: + ser = io.BytesIO() data_frame.to_pickle(ser, compression="gzip") ser.seek(0) r.set('comp_' + fname, ser.read()) else: + ser = io.BytesIO() data_frame.to_pickle(ser) ser.seek(0) @@ -1031,20 +1034,8 @@ def to_parquet(self, df, path, filename=None, cloud_credentials=None, """ logger = LoggerManager.getLogger(__name__) - if cloud_credentials is None: cloud_credentials = DataConstants().cloud_credentials - - if isinstance(path, list): - pass - else: - path = [path] - - if filename is not None: - new_path = [] - - for p in path: - new_path.append(self.path_join(p, filename)) - - path = new_path + path, cloud_credentials = self._get_cloud_path( + path, filename=filename, cloud_credentials=cloud_credentials) constants = DataConstants() @@ -1283,7 +1274,7 @@ def to_csv_parquet(self, df, path, filename=None, cloud_credentials=None, parquet_compression=parquet_compression, use_pyarrow_directly=use_pyarrow_directly) - def to_csv(self, df, path, filename=None, cloud_credentials=None): + def _get_cloud_path(self, path, filename=None, cloud_credentials=None): if cloud_credentials is None: cloud_credentials = constants.cloud_credentials if isinstance(path, list): @@ -1299,6 +1290,13 @@ def to_csv(self, df, path, filename=None, cloud_credentials=None): path = new_path + return path, cloud_credentials + + def to_csv(self, df, path, filename=None, cloud_credentials=None): + + path, cloud_credentials = self._get_cloud_path( + path, filename=filename, cloud_credentials=cloud_credentials) + for p in path: if "s3://" in p: s3 = self._create_cloud_filesystem(cloud_credentials, @@ -1312,6 +1310,30 @@ def to_csv(self, df, path, filename=None, cloud_credentials=None): else: df.to_csv(p) + def to_json(self, dictionary, path, filename=None, cloud_credentials=None): + + path, cloud_credentials = self._get_cloud_path( + path, filename=filename, cloud_credentials=cloud_credentials) + + for p in path: + if "s3://" in p: + s3 = self._create_cloud_filesystem(cloud_credentials, + 's3_filesystem') + + path_in_s3 = self.sanitize_path(p).replace("s3://", "") + + # Use 'w' for py3, 'wb' for py2 + with s3.open(path_in_s3, 'w') as f: + if isinstance(dictionary, dict): + json.dump(dictionary, f, indent=4) + else: + dictionary.to_json(f) + else: + if isinstance(dictionary, dict): + json.dump(dictionary, p, indent=4) + elif isinstance(dictionary, pd.DataFrame): + dictionary.to_json(p) + def path_exists(self, path, cloud_credentials=None): if cloud_credentials is None: cloud_credentials = constants.cloud_credentials diff --git a/findatapy/market/marketdatagenerator.py b/findatapy/market/marketdatagenerator.py index 07809a4..b73f406 100644 --- a/findatapy/market/marketdatagenerator.py +++ b/findatapy/market/marketdatagenerator.py @@ -162,6 +162,8 @@ def get_data_vendor(self, md_request): data_vendor = DataVendorHuobi() elif data_source in self._data_vendor_dict: data_vendor = self._data_vendor_dict[data_source] + elif data_source in md_request.data_vendor_custom: + data_vendor = md_request.data_vendor_custom[data_source] else: logger.warn(str(data_source) + " is an unrecognized data source") diff --git a/findatapy/market/marketdatarequest.py b/findatapy/market/marketdatarequest.py index 6d1d77d..96bf5df 100644 --- a/findatapy/market/marketdatarequest.py +++ b/findatapy/market/marketdatarequest.py @@ -79,7 +79,8 @@ def generate_key(self): ["logger", "_MarketDataRequest__abstract_curve", "_MarketDataRequest__cache_algo", - "_MarketDataRequest__overrides"]) \ + "_MarketDataRequest__overrides", + "_MarketDataRequest__data_vendor_custom"]) \ + "_df" def __init__(self, data_source=None, @@ -110,7 +111,8 @@ def __init__(self, data_source=None, eikon_api_key=data_constants.eikon_api_key, push_to_cache=True, overrides={}, - freeform_md_request={}): + freeform_md_request={}, + data_vendor_custom=data_constants.data_vendor_custom): # Can deep copy MarketDataRequest (use a lock, so can be used with # threading when downloading time series) @@ -174,6 +176,7 @@ def __init__(self, data_source=None, copy.deepcopy(md_request.freeform_md_request) self.tickers = copy.deepcopy(md_request.tickers) # Need this after category in case have wildcard + self.data_vendor_custom = copy.deepcopy(md_request.data_vendor_custom) else: self.freq_mult = freq_mult @@ -228,6 +231,7 @@ def __init__(self, data_source=None, self.tickers = vendor_tickers self.old_tickers = self.tickers + self.data_vendor_custom = data_vendor_custom def __str__(self): return "MarketDataRequest summary - " + self.generate_key() diff --git a/findatapy/util/dataconstants.py b/findatapy/util/dataconstants.py index 2103e31..b9dceee 100644 --- a/findatapy/util/dataconstants.py +++ b/findatapy/util/dataconstants.py @@ -271,6 +271,8 @@ class DataConstants(object): default_data_environment = 'backtest' possible_data_environment = ['backtest', 'prod'] + data_vendor_custom = {} + # Overwrite field variables with those listed in DataCred or user provided dictionary override_fields def __init__(self, override_fields={}): try: diff --git a/setup.py b/setup.py index f2512ef..8fc7f8b 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ tickers, using configuration files. There is also functionality which is particularly useful for those downloading FX market data.""" setup(name='findatapy', - version='0.1.26', + version='0.1.27', description='Market data library', author='Saeed Amen', author_email='saeed@cuemacro.com',