diff --git a/README.md b/README.md index 5e7ed8f..4ea25ea 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ individual data providers) # Release Notes +* 0.1.29 - findatapy (14 May 2023) * 0.1.28 - findatapy (19 Jul 2022) * 0.1.27 - findatapy (20 May 2022) * 0.1.26 - findatapy (07 Oct 2021) @@ -135,6 +136,10 @@ individual data providers) # Coding log +* 12 May 2023 + * Fixed Dukascopy downloader +* 29 Aug 2022 + * Reformatted text for line length * 23 Aug 2022 * Simplified MarketDataRequest * 12 Aug 2022 diff --git a/findatapy/market/datavendorbbg.py b/findatapy/market/datavendorbbg.py index e133f7c..59c4021 100644 --- a/findatapy/market/datavendorbbg.py +++ b/findatapy/market/datavendorbbg.py @@ -162,7 +162,8 @@ def load_ticker(self, md_request): md_request_vendor) # if data_frame is not None: - # # Convert fields with release-dt to dates (special case!) and assume everything else numerical + # # Convert fields with release-dt to dates (special case!) + # # and assume everything else numerical # for c in data_frame.columns: # try: # if 'release-dt' in c: @@ -446,7 +447,7 @@ def download_intraday(self, md_request): # Bloomberg OpenAPI implementation low_level_loader = BBGLowLevelIntraday() - # by default we download all available fields! + # By default we download all available fields! data_frame = low_level_loader.load_time_series(md_request) # self.kill_session() # need to forcibly kill_session since can't @@ -472,7 +473,7 @@ def download_ref(self, md_request): md_request_vendor_selective = copy.copy(md_request) - # special case for future date releases + # Special case for future date releases # if 'release-date-time-full' in md_request.fields: # md_request_vendor_selective.fields = ['ECO_FUTURE_RELEASE_DATE_LIST'] # @@ -706,7 +707,7 @@ def start_bloomberg_session(self): logger = LoggerManager().getLogger(__name__) # Try up to 5 times to start a session - while (tries < 5): + while tries < 5: try: # fill SessionOptions sessionOptions = blpapi.SessionOptions() diff --git a/findatapy/market/datavendorweb.py b/findatapy/market/datavendorweb.py index 30180fd..9755451 100644 --- a/findatapy/market/datavendorweb.py +++ b/findatapy/market/datavendorweb.py @@ -1784,8 +1784,18 @@ def fetch_tick(self, tick_url, try_time): # If URL has not been found try again if tick_request.status_code == 404: logger.warning( - "Error downloading.. " + tick_url + " returned 404 " - "URL not found message! Are you sure Dukascopy has this asset?") + "Error downloading.. " + + tick_url + " returned 404 " + + "URL not found message! Are you sure Dukascopy has this asset?") + + tick_request_content = None + tick_request.close() + + break + elif tick_request.status_code == 503: + logger.warning( + "Error downloading.. " + tick_url + + " returned 503 and service unavailable") tick_request_content = None tick_request.close() @@ -1798,13 +1808,13 @@ def fetch_tick(self, tick_url, try_time): content_text = tick_request_content.decode("latin1") - # Can sometimes get back an error HTML page, in which + # Can sometimes get back an error HTML page, in which # case retry if 'error' not in str(content_text): break else: logger.warning( - "Error downloading.. " + tick_url + " " + "Error downloading.. " + tick_url + " " + content_text + " will try again " + str(download_counter) + " occasion") except Exception as e: @@ -2811,6 +2821,8 @@ def download_data_frame(data_source): data_frame = IOEngine().read_time_series_cache_from_disk( full_path, engine=data_engine) + # data_frame.to_csv("temp.csv") + if data_frame is None or data_frame.index is []: return None if data_frame is not None: @@ -2831,8 +2843,13 @@ def download_data_frame(data_source): data_frame.columns = ticker_combined data_frame.index.name = "Date" + msg = str(ticker_combined) + + if len(msg) > 100: + msg = msg[:99] + "...]" + logger.info("Completed request from " + str( - data_source) + " for " + str(ticker_combined)) + data_source) + " for " + msg) return data_frame diff --git a/findatapy/market/ioengine.py b/findatapy/market/ioengine.py index 410a0ba..3df8bb0 100644 --- a/findatapy/market/ioengine.py +++ b/findatapy/market/ioengine.py @@ -243,22 +243,25 @@ def remove_time_series_cache_on_disk(self, fname, engine='hdf5_fixed', pass ### functions to handle HDF5 on disk, arctic etc. - def write_time_series_cache_to_disk(self, fname, data_frame, - engine='hdf5_fixed', append_data=False, - db_server=constants.db_server, - db_port=constants.db_port, - username=constants.db_username, - password=constants.db_password, - filter_out_matching=None, timeout=10, - use_cache_compression=constants.use_cache_compression, - parquet_compression=constants.parquet_compression, - use_pyarrow_directly=False, - md_request=None, ticker=None, - cloud_credentials=None): - """Writes Pandas data frame to disk as Parquet, HDF5 format or bcolz format, in Arctic or to Redis - - Note, that Redis uses pickle (you must make sure that your Redis instance is not accessible - from unverified users, given you should not unpickle from unknown sources) + def write_time_series_cache_to_disk( + self, fname, data_frame, + engine='hdf5_fixed', append_data=False, + db_server=constants.db_server, + db_port=constants.db_port, + username=constants.db_username, + password=constants.db_password, + filter_out_matching=None, timeout=10, + use_cache_compression=constants.use_cache_compression, + parquet_compression=constants.parquet_compression, + use_pyarrow_directly=False, + md_request=None, ticker=None, + cloud_credentials=None): + """Writes Pandas data frame to disk as Parquet, HDF5 format or bcolz + format, in Arctic or to Redis + + Note, that Redis uses pickle (you must make sure that your Redis + instance is not accessible from unverified users, given you should not + unpickle from unknown sources) Parmeters --------- @@ -283,7 +286,8 @@ def write_time_series_cache_to_disk(self, fname, data_frame, logger = LoggerManager().getLogger(__name__) - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if md_request is not None: fname = self.path_join(fname, md_request.create_category_key( @@ -345,8 +349,18 @@ def write_time_series_cache_to_disk(self, fname, data_frame, "Didn't push " + fname + " to Redis given not running") except Exception as e: + fname_msg = fname + + if len(fname_msg) > 150: + fname_msg = fname_msg[:149] + "..." + + error_msg = str(e) + + if len(error_msg) > 150: + error_msg = error_msg[:149] + "..." + logger.warning( - "Couldn't push " + fname + " to Redis: " + str(e)) + "Couldn't push " + fname_msg + " to Redis: " + error_msg) elif engine == 'arctic': @@ -399,7 +413,8 @@ def write_time_series_cache_to_disk(self, fname, data_frame, data_frame = data_frame[new_cols] - # Problems with Arctic when writing timezone to disk sometimes, so strip + # Problems with Arctic when writing timezone to disk sometimes, + # so strip data_frame = data_frame.copy().tz_localize(None) try: @@ -418,8 +433,9 @@ def write_time_series_cache_to_disk(self, fname, data_frame, elif engine == 'hdf5': h5_filename = self.get_h5_filename(fname) - # append data only works for HDF5 stored as tables (but this is much slower than fixed format) - # removes duplicated entries at the end + # Append data only works for HDF5 stored as tables (but this is + # much slower than fixed format) removes duplicated entries at + # the end if append_data: store = pd.HDFStore(h5_filename, format=hdf5_format, complib="zlib", complevel=9) @@ -427,8 +443,8 @@ def write_time_series_cache_to_disk(self, fname, data_frame, if ('intraday' in fname): data_frame = data_frame.astype('float32') - # get last row which matches and remove everything after that (because append - # function doesn't check for duplicated rows + # get last row which matches and remove everything after that + # (because append function doesn't check for duplicated rows) nrows = len(store['data'].index) last_point = data_frame.index[-1] @@ -442,7 +458,8 @@ def write_time_series_cache_to_disk(self, fname, data_frame, i = i - 1 - # remove rows at the end, which are duplicates of the incoming time series + # Remove rows at the end, which are duplicates of the + # incoming time series store.remove(key='data', start=i, stop=nrows) store.put(key='data', value=data_frame, format=hdf5_format, append=True) @@ -614,7 +631,8 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', for fname_single in fname: logger.debug("Reading " + fname_single + "..") - if engine == 'parquet' and '.gzip' not in fname_single and '.parquet' not in fname_single: + if engine == 'parquet' and '.gzip' not in fname_single \ + and '.parquet' not in fname_single: fname_single = fname_single + '.parquet' if (engine == 'redis'): @@ -628,8 +646,9 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', # is there a compressed key stored?) k = r.keys('comp_' + fname_single) - # if so, then it means that we have stored it as a compressed object - # if have more than 1 element, take the last (which will be the latest to be added) + # If so, then it means that we have stored it as a + # compressed object if have more than 1 element, take the + # last (which will be the latest to be added) if (len(k) >= 1): k = k[-1].decode('utf-8') @@ -642,13 +661,14 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', except Exception as e: logger.info( - "Cache not existent for " + fname_single + " in Redis: " + str( + "Cache not existent for " + + fname_single + " in Redis: " + str( e)) if msg is None: data_frame = None else: - logger.info('Load Redis cache: ' + fname_single) + logger.info("Load Redis cache: " + fname_single) data_frame = msg # pd.read_msgpack(msg) @@ -660,7 +680,7 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', fname_single = os.path.basename(fname_single).replace('.', '_') - logger.info('Load Arctic/MongoDB library: ' + fname_single) + logger.info("Load Arctic/MongoDB library: " + fname_single) if username is not None and password is not None: c = pymongo.MongoClient( @@ -696,8 +716,8 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', except Exception as e: logger.warning( - 'Library may not exist or another error: ' + fname_single + ' & message is ' + str( - e)) + 'Library may not exist or another error: ' + + fname_single + ' & message is ' + str(e)) data_frame = None elif self.path_exists(self.get_h5_filename(fname_single)): @@ -878,7 +898,8 @@ def convert_csv_data_frame(self, f_name, category, freq, cutoff=None, self.write_time_series_cache_to_disk(category_f_name, data_frame) def clean_csv_file(self, f_name): - """Cleans up CSV file (removing empty characters) before writing back to disk + """Cleans up CSV file (removing empty characters) before writing back + to disk Parameters ---------- @@ -902,7 +923,8 @@ def clean_csv_file(self, f_name): def create_cache_file_name(self, filename): return constants.folder_time_series_data + "/" + filename - # TODO refactor IOEngine so that each database is implemented in a subclass of DBEngine + # TODO refactor IOEngine so that each database is implemented in a + # subclass of DBEngine def get_engine(self, engine='hdf5_fixed'): pass @@ -942,7 +964,8 @@ def read_parquet(self, path, cloud_credentials=None): ------- DataFrame """ - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if "s3://" in path: storage_options = self._convert_cred(cloud_credentials, @@ -1064,15 +1087,17 @@ def to_parquet(self, df, path, filename=None, cloud_credentials=None, cloud_credentials_ = self._convert_cred(cloud_credentials) - # Tends to be slower than using pandas/pyarrow directly, but for very large files, we might have to split - # before writing to disk + # Tends to be slower than using pandas/pyarrow directly, but for very + # large files, we might have to split before writing to disk def pyarrow_dump(df, path): - # Trying to convert large Pandas DataFrames in one go to Arrow tables can result in out-of-memory - # messages, so chunk them first, convert them one by one, and write to disk in chunks + # Trying to convert large Pandas DataFrames in one go to Arrow + # tables can result in out-of-memory messages, so chunk them first, + # convert them one by one, and write to disk in chunks df_list = self.chunk_dataframes(df) - # Using pandas.to_parquet, doesn't let us pass in parameters to allow coersion of timestamps - # hence have to do it this way, using underlying pyarrow interface (/) + # Using pandas.to_parquet, doesn't let us pass in parameters to + # allow coersion of timestamps hence have to do it this way, + # using underlying pyarrow interface (/) # ie. ns -> us if not (isinstance(df_list, list)): df_list = [df_list] @@ -1100,11 +1125,12 @@ def pyarrow_dump(df, path): table = pa.Table.from_pandas(df_) if pqwriter is None: - pqwriter = pq.ParquetWriter(p_in_s3, table.schema, - compression=parquet_compression, - coerce_timestamps=constants.default_time_units, - allow_truncated_timestamps=True, - filesystem=s3) + pqwriter = pq.ParquetWriter( + p_in_s3, table.schema, + compression=parquet_compression, + coerce_timestamps=constants.default_time_units, + allow_truncated_timestamps=True, + filesystem=s3) pqwriter.write_table(table) @@ -1118,10 +1144,11 @@ def pyarrow_dump(df, path): table = pa.Table.from_pandas(df_) if pqwriter is None: - pqwriter = pq.ParquetWriter(p, table.schema, - compression=parquet_compression, - coerce_timestamps=constants.default_time_units, - allow_truncated_timestamps=True) + pqwriter = pq.ParquetWriter( + p, table.schema, + compression=parquet_compression, + coerce_timestamps=constants.default_time_units, + allow_truncated_timestamps=True) pqwriter.write_table(table) @@ -1136,7 +1163,8 @@ def pyarrow_dump(df, path): if use_pyarrow_directly: pyarrow_dump(df, path) else: - # First try to use Pandas/pyarrow, if fails, which can occur with large DataFrames use chunked write + # First try to use Pandas/pyarrow, if fails, which can occur with + # large DataFrames use chunked write try: for p in path: p = self.sanitize_path(p) @@ -1145,20 +1173,22 @@ def pyarrow_dump(df, path): storage_options = self._convert_cred(cloud_credentials, convert_to_s3fs=True) - df.to_parquet(p, compression=parquet_compression, - coerce_timestamps=constants.default_time_units, - allow_truncated_timestamps=True, - storage_options=storage_options) + df.to_parquet( + p, compression=parquet_compression, + coerce_timestamps=constants.default_time_units, + allow_truncated_timestamps=True, + storage_options=storage_options) else: - df.to_parquet(p, compression=parquet_compression, - coerce_timestamps=constants.default_time_units, - allow_truncated_timestamps=True) + df.to_parquet( + p, compression=parquet_compression, + coerce_timestamps=constants.default_time_units, + allow_truncated_timestamps=True) except pyarrow.lib.ArrowMemoryError as e: logger.warning( - "Couldn't dump using Pandas/pyarrow, will instead try chunking with pyarrow directly " + str( - e)) + "Couldn't dump using Pandas/pyarrow, will instead try " + "chunking with pyarrow directly " + str(e)) pyarrow_dump(df, path) @@ -1210,7 +1240,8 @@ def split_array_chunks(self, array, chunks=None, chunk_size=None): return array def get_obj_size_mb(self, obj): - # Can sometime have very large dataframes, which need to be split, otherwise won't fit in a single Redis key + # Can sometime have very large dataframes, which need to be split, + # otherwise won't fit in a single Redis key mem = obj.memory_usage(deep='deep').sum() mem_float = round(float(mem) / (1024.0 * 1024.0), 3) @@ -1219,7 +1250,8 @@ def get_obj_size_mb(self, obj): def chunk_dataframes(self, obj, chunk_size_mb=constants.chunk_size_mb): logger = LoggerManager.getLogger(__name__) - # Can sometime have very large dataframes, which need to be split, otherwise won't fit in a single Redis key + # Can sometime have very large dataframes, which need to be split, + # otherwise won't fit in a single Redis key mem_float = self.get_obj_size_mb(obj) mem = '----------- ' + str(mem_float) + ' MB -----------' @@ -1239,7 +1271,8 @@ def chunk_dataframes(self, obj, chunk_size_mb=constants.chunk_size_mb): def read_csv(self, path, cloud_credentials=None, encoding='utf-8', encoding_errors=None, errors='ignore'): - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if "s3://" in path: s3 = self._create_cloud_filesystem(cloud_credentials, @@ -1275,7 +1308,8 @@ def to_csv_parquet(self, df, path, filename=None, cloud_credentials=None, use_pyarrow_directly=use_pyarrow_directly) def _get_cloud_path(self, path, filename=None, cloud_credentials=None): - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if isinstance(path, list): pass @@ -1335,7 +1369,8 @@ def to_json(self, dictionary, path, filename=None, cloud_credentials=None): dictionary.to_json(p) def path_exists(self, path, cloud_credentials=None): - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if "s3://" in path: s3 = self._create_cloud_filesystem(cloud_credentials, @@ -1366,7 +1401,6 @@ def path_join(self, folder, *file): folder = "s3://" + folder else: - folder = os.path.join(folder, *file) folder = folder.replace("\\\\", "/") @@ -1375,7 +1409,8 @@ def path_join(self, folder, *file): return folder def list_files(self, path, cloud_credentials=None): - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if "s3://" in path: s3 = self._create_cloud_filesystem(cloud_credentials, @@ -1398,7 +1433,8 @@ def list_files(self, path, cloud_credentials=None): return files def delete(self, path, cloud_credentials=None): - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if not (isinstance(path, list)): path = [path] @@ -1418,7 +1454,8 @@ def delete(self, path, cloud_credentials=None): def copy(self, source, destination, cloud_credentials=None, infer_dest_filename=False): - if cloud_credentials is None: cloud_credentials = constants.cloud_credentials + if cloud_credentials is None: + cloud_credentials = constants.cloud_credentials if destination is None: destination = "" @@ -1436,8 +1473,8 @@ def copy(self, source, destination, cloud_credentials=None, if "s3://" in dest: infer_dest_filename = True - list_files = self.list_files(so, - cloud_credentials=cloud_credentials) + list_files = self.list_files( + so, cloud_credentials=cloud_credentials) self.copy(list_files, dest, infer_dest_filename=infer_dest_filename) @@ -1466,12 +1503,14 @@ def copy(self, source, destination, cloud_credentials=None, dest, recursive=True) -####################################################################################################################### +############################################################################### class SpeedCache(object): - """Wrapper for cache hosted in external in memory database (by default Redis, although in practice, can use - any database supported in this class). This allows us to share hash across Python instances, rather than having - repopulate each time we restart Python. Also can let us share cache easily across threads, without replicating. + """Wrapper for cache hosted in external in memory database (by default + Redis, although in practice, can use any database supported in this class). + This allows us to share hash across Python instances, rather than having + repopulate each time we restart Python. Also can let us share cache easily + across threads, without replicating. """ @@ -1515,16 +1554,18 @@ def dump_key(self, key): if self.engine == 'no_cache': return try: - return self.io_engine.remove_time_series_cache_on_disk(key, - engine=self.engine, - db_server=self.db_cache_server, - db_port=self.db_cache_port) + return self.io_engine.remove_time_series_cache_on_disk( + key, + engine=self.engine, + db_server=self.db_cache_server, + db_port=self.db_cache_port) except: pass def generate_key(self, obj, key_drop=[]): - """Create a unique hash key for object from its attributes (excluding those attributes in key drop), which can be - used as a hashkey in the Redis hashtable + """Create a unique hash key for object from its attributes (excluding + those attributes in key drop), which can be used as a hashkey in the + Redis hashtable Parameters ---------- diff --git a/findatapy/market/market.py b/findatapy/market/market.py index a6bc2e8..3faf371 100644 --- a/findatapy/market/market.py +++ b/findatapy/market/market.py @@ -80,8 +80,10 @@ def fetch_market(self, md_request=None, md_request_df=None, vendor_ticker eg. EURUSD Curncy vendor_field eg. PX_LAST - We can also create MarketDataRequest objects, using strings eg. fx.quandl.daily.NYC.EURUSD (category.data_source.freq.cut.ticker), - as Python dict, as DataFrame with various properties (like ticker, category etc.) + We can also create MarketDataRequest objects, using strings + eg. fx.quandl.daily.NYC.EURUSD (category.data_source.freq.cut.ticker), + as Python dict, as DataFrame with various properties + (like ticker, category etc.) Parameters ---------- @@ -92,7 +94,8 @@ def fetch_market(self, md_request=None, md_request_df=None, Another way to specify some of the data request properties md_request_str : str - We can specify part of the MarketDataRequest as a string, which gets converted later (or a JSON object) + We can specify part of the MarketDataRequest as a string, which + gets converted later (or a JSON object) Returns ------- diff --git a/findatapy/timeseries/calculations.py b/findatapy/timeseries/calculations.py index ab05da8..702b4f3 100644 --- a/findatapy/timeseries/calculations.py +++ b/findatapy/timeseries/calculations.py @@ -1450,7 +1450,8 @@ def linear_regression(self, df_y, df_x): def linear_regression_single_vars(self, df_y, df_x, y_vars, x_vars, use_stats_models=True): - """Do a linear regression of a number of y and x variable pairs in different dataframes, report back the coefficients. + """Do a linear regression of a number of y and x variable pairs in + different dataframes, report back the coefficients. Parameters ---------- @@ -1463,7 +1464,8 @@ def linear_regression_single_vars(self, df_y, df_x, y_vars, x_vars, x_vars : str (list) Which x variables should we regress use_stats_models : bool (default: True) - Should we use statsmodels library directly or pandas.stats.api.ols wrapper (warning: deprecated) + Should we use statsmodels library directly or + pandas.stats.api.ols wrapper (warning: deprecated) Returns ------- @@ -1481,7 +1483,8 @@ def linear_regression_single_vars(self, df_y, df_x, y_vars, x_vars, if pd.__version__ < '0.17' or not (use_stats_models): out = pd.stats.api.ols(y=y, x=x) else: - # pandas.stats.api is now being depreciated, recommended replacement package + # pandas.stats.api is now being depreciated, + # recommended replacement package # http://www.statsmodels.org/stable/regression.html # we follow the example from there - Fit and summarize OLS model @@ -1492,7 +1495,8 @@ def linear_regression_single_vars(self, df_y, df_x, y_vars, x_vars, # to remove NaN values (otherwise regression is undefined) (y, x, a, b, c, d) = self._filter_data(y, x) - # assumes we have a constant (remove add_constant wrapper to have no intercept reported) + # Assumes we have a constant (remove add_constant + # wrapper to have no intercept reported) mod = sm.OLS(y.get_values(), statsmodels.tools.add_constant( x.get_values())) @@ -1538,7 +1542,8 @@ def strip_linear_regression_output(self, indices, ols_list, var): return df - ##### Various methods for averaging time series by hours, mins and days (or specific columns) to create summary time series + ##### Various methods for averaging time series by hours, mins and days + # (or specific columns) to create summary time series def average_by_columns_list(self, data_frame, columns): return data_frame. \ groupby(columns).mean() @@ -1702,14 +1707,14 @@ def average_by_month_day_by_bus_day(self, data_frame, cal="FX"): try: return data_frame. \ groupby([date_index.month.rename('month'), - Calendar().get_bus_day_of_month(date_index, cal, - tz=data_frame.index.tz).rename( - 'day')]).mean() + Calendar().get_bus_day_of_month( + date_index, cal, + tz=data_frame.index.tz).rename('day')]).mean() except: return data_frame. \ groupby([date_index.month, - Calendar().get_bus_day_of_month(date_index, cal, - tz=data_frame.index.tz)]).mean() + Calendar().get_bus_day_of_month( + date_index, cal, tz=data_frame.index.tz)]).mean() def average_by_month_day_by_day(self, data_frame): date_index = data_frame.index @@ -1856,12 +1861,14 @@ def _combine_rhs(self, rhs): def insert_sparse_time_series(self, df_sparse_time_series, pre_window_size, post_window_size, unit): - """ Given a sparse time series dataframe, return inserted dataframe with given unit/window + """Given a sparse time series dataframe, return inserted dataframe + with given unit/window e.g for a given sparse time series df, df[30] = 4.0 pre and post window sizes are 5 then the function will insert 4.0 to df[26:30] and df[30:35] - *Note - may have chaotic results if df is not sparse enough (since the windows may overlap) + *Note - may have chaotic results if df is not sparse enough (since the + windows may overlap) Parameters ---------- @@ -1917,8 +1924,9 @@ def insert_sparse_time_series(self, df_sparse_time_series, pre_window_size, data=narray) # now df should become [0 0 0 3 0 0 x 0 0 0 2 0 0 ] - # to make sure the final backward filling won't replace all elements to the first one - # we give a value at the backward_fill_bound (which is one unit before the pre window) + # to make sure the final backward filling won't replace all + # elements to the first one we give a value at the + # backward_fill_bound (which is one unit before the pre window) df[i].at[backward_fill_bound] = 4 # now df should become [0 0 4 3 0 0 x 0 0 0 2 0 0] diff --git a/findatapy/util/dataconstants.py b/findatapy/util/dataconstants.py index 8adda25..d6c5ec0 100644 --- a/findatapy/util/dataconstants.py +++ b/findatapy/util/dataconstants.py @@ -150,12 +150,12 @@ class DataConstants(object): # How many threads to use for loading external data (don't do too many on slow machines!) # also some data sources will complain if you start too many parallel threads to call data! # for some data providers might get better performance from 1 thread only! - market_thread_no = { 'quandl' : 4, - 'bloomberg' : 4, - 'yahoo' : 1, # yfinance already threads requests, so don't do it twice! - 'other' : 4, - 'dukascopy' : 8, - 'fxcm' : 4} + market_thread_no = {'quandl' : 4, + 'bloomberg' : 4, + 'yahoo' : 1, # yfinance already threads requests, so don't do it twice! + 'other' : 4, + 'dukascopy' : 3, # do not do too many! + 'fxcm' : 4} # Seconds for timeout timeout_downloader = {'dukascopy' : 120} diff --git a/findatapy/util/loggermanager.py b/findatapy/util/loggermanager.py index 1dab4bf..f31143d 100644 --- a/findatapy/util/loggermanager.py +++ b/findatapy/util/loggermanager.py @@ -3,13 +3,16 @@ # # Copyright 2016 Cuemacro # -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the -# License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on a "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -# See the License for the specific language governing permissions and limitations under the License. +# See the License for the specific language governing permissions and +# limitations under the License. # import logging diff --git a/findatapy/util/tickerfactory.py b/findatapy/util/tickerfactory.py index 39d721d..c22289c 100644 --- a/findatapy/util/tickerfactory.py +++ b/findatapy/util/tickerfactory.py @@ -117,7 +117,8 @@ def aggregate_ticker_excel(self, excel_file, out_csv, sheets=[], for sh in sheets: logger.info("Reading from " + sh + " in " + excel_file) - df = pd.read_excel(excel_file, sheet_name=sh, skiprows=skiprows) + df = pd.read_excel(excel_file, sheet_name=sh, skiprows=skiprows, + engine='openpyxl') df = df.dropna(how="all") if "maker" in sh: diff --git a/findatapy/util/twitter.py b/findatapy/util/twitter.py index 7123cab..a90ff54 100644 --- a/findatapy/util/twitter.py +++ b/findatapy/util/twitter.py @@ -47,7 +47,7 @@ def update_status(self, msg, link = None, picture = None): if link is not None: chars_lim = chars_lim - (22 * link) if picture is not None: chars_lim = chars_lim - 23 - if (len(msg) > chars_lim): + if len(msg) > chars_lim: self.logger.info("Message too long for Twitter!") if picture is None: diff --git a/setup.py b/setup.py index 286aa56..8a6704f 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ tickers, using configuration files. There is also functionality which is particularly useful for those downloading FX market data.""" setup(name='findatapy', - version='0.1.28', + version='0.1.29', description='Market data library', author='Saeed Amen', author_email='saeed@cuemacro.com',