Skip to content

Commit

Permalink
feat: dynamic download window based on data type and timeframe
Browse files Browse the repository at this point in the history
  • Loading branch information
kieran-mackle committed Feb 20, 2024
1 parent 7b074a7 commit beefef8
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 105 deletions.
252 changes: 153 additions & 99 deletions src/ccxt_download/public.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import logging
import pandas as pd
import ccxt.pro as ccxt
from typing import Optional, Union
from aiolimiter import AsyncLimiter
from datetime import datetime, timedelta
from typing import Optional, Union, Coroutine
from ccxt_download import CANDLES, TRADES, FUNDING
from ccxt_download.utilities import filename_builder
from ccxt_download.constants import DATATYPES, CCXT_EXCHANGES, DEFAULT_DOWNLOAD_DIR
from ccxt_download.utilities import (
filename_builder,
timedelta_from_str,
_period_start,
_timestep_from_timedelta,
)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -159,27 +164,23 @@ async def download_async(
if options is None:
options = {}

# Collect tasks
tasks = []
for datatype in data_types:
for symbol in symbols:
method = globals().get(datatype)
kwargs = options.get(datatype, {})
current_dt = start_dt
while current_dt < end_dt:
# Download data for this chunk
coro = method(
exchange=exchange,
symbol=symbol,
start_dt=current_dt,
rate_limiter=rate_limiter,
download_dir=download_dir,
verbose=verbose,
**kwargs,
)
tasks.append(coro)

# Walk forwards a day
current_dt = current_dt + timedelta(days=1)
tasks = await method(
exchange=exchange,
symbol=symbol,
start_dt=start_dt,
end_dt=end_dt,
rate_limiter=rate_limiter,
download_dir=download_dir,
verbose=verbose,
tasks=tasks,
**kwargs,
)

await asyncio.gather(*tasks)

Expand Down Expand Up @@ -208,12 +209,14 @@ async def candles(
exchange: ccxt.Exchange,
symbol: str,
start_dt: datetime,
end_dt: datetime,
rate_limiter: AsyncLimiter,
timeframe: Optional[str] = "1m",
download_dir: str = DEFAULT_DOWNLOAD_DIR,
verbose: Optional[bool] = True,
tasks: Optional[list[Coroutine]] = None,
) -> pd.DataFrame:
"""Download candle (OHLCV) data.
"""Download candle (OHLCV) data for a specified date range.
Parameters
----------
Expand All @@ -229,6 +232,10 @@ async def candles(
The start date of the data to download, provided as a
datetime object.
end_date : str | datetime
The end date of the data to download, provided as a
datetime object or as a string in the form 'YYYY-MM-DD'.
end_dt : datetime
The end date of the data to download, provided as a
datetime object.
Expand All @@ -245,10 +252,57 @@ async def candles(
verbose : bool, optional
Be verbose. The default is True.
tasks : list[coroutine], optional
A list of coroutine tasks to append to. Used internally.
"""
# Determine timestep based on timeframe
td = timedelta_from_str(timeframe)
timestep = _timestep_from_timedelta(td)

# Iterate through date range
current_dt = _period_start(td, start_dt)
while current_dt < end_dt:
# Download data for this chunk
coro = _candle_helper(
exchange=exchange,
symbol=symbol,
start_dt=current_dt,
window_length=timestep,
rate_limiter=rate_limiter,
timeframe=timeframe,
download_dir=download_dir,
verbose=verbose,
)
if tasks is not None:
# Append to tasks list
tasks.append(coro)
else:
# Await and return immediately
return await coro

# Walk forwards in time
current_dt = _period_start(td, current_dt + timestep)

if tasks is not None:
return tasks


async def _candle_helper(
exchange: ccxt.Exchange,
symbol: str,
start_dt: datetime,
window_length: timedelta,
rate_limiter: AsyncLimiter,
timeframe: Optional[str] = "1m",
download_dir: str = DEFAULT_DOWNLOAD_DIR,
verbose: Optional[bool] = True,
) -> pd.DataFrame:
# Build filename using start date and window length
filename = filename_builder(
exchange=exchange.name.lower(),
start_dt=start_dt,
window_length=window_length,
download_dir=download_dir,
symbol=symbol,
data_type=CANDLES,
Expand All @@ -269,22 +323,12 @@ async def candles(
)

# Fetch OHLCV data
timeframe_map = {
"4h": 4 * 60 * 60e3,
"1h": 60 * 60e3,
"15m": 15 * 60e3,
"5m": 5 * 60e3,
"1m": 60e3,
"1s": 1e3,
}
start_ts = int(start_dt.timestamp() * 1000)
try:
timeframe_ms = timeframe_map[timeframe]
except KeyError:
raise KeyError(
f"Timeframe key '{timeframe}' not supported. Must be one of the following: {', '.join(timeframe_map.keys())}"
)
end_ts = int((start_dt + timedelta(days=1)).timestamp() * 1000)
timeframe_ms = timedelta_from_str(timeframe).total_seconds() * 1000
except ValueError:
raise KeyError(f"Timeframe key '{timeframe}' not supported.")
end_ts = int((start_dt + window_length).timestamp() * 1000)
ohlcv_data = []
current_ts = start_ts
while current_ts < end_ts:
Expand Down Expand Up @@ -328,19 +372,21 @@ async def candles(

if verbose:
print(
f"Finished downloading {timeframe} candles for {symbol} on {exchange} on {start_dt.strftime('%Y-%m-%d')}."
f"Finished downloading {timeframe} candles for {symbol} on {exchange} starting {start_dt.strftime('%Y-%m-%d')}."
)


async def fetch_daily(
async def trades(
exchange: ccxt.Exchange,
symbol: str,
start_dt: datetime,
end_dt: datetime,
rate_limiter: AsyncLimiter,
download_dir: str = DEFAULT_DOWNLOAD_DIR,
verbose: Optional[bool] = True,
):
"""Helper function to download daily candle (OHLCV) data.
tasks: Optional[list[Coroutine]] = None,
) -> pd.DataFrame:
"""Download trade data.
Parameters
----------
Expand All @@ -363,82 +409,50 @@ async def fetch_daily(
rate_limiter : asiolimiter.AsyncLimiter
An asyncio rate limiter object.
download_dir : str, optional
The path to the download directory. The default is
'.ccxt_data/' in your user's home directory.
verbose : bool, optional
Be verbose. The default is True.
"""
timeframe = "1d"
start_ts = int(start_dt.timestamp()*1000)
end_ts = int(end_dt.timestamp()*1000)
ohlcv_data = []
current_ts = start_ts
while current_ts < end_ts:
async with rate_limiter:
data = await exchange.fetch_ohlcv(
symbol=symbol,
timeframe=timeframe,
since=current_ts,
)
if len(data) < 1:
break
ohlcv_data += data
current_ts = data[-1][0] + 1

# Convert the data into a DataFrame
columns = ["Timestamp", "Open", "High", "Low", "Close", "Volume"]
df = pd.DataFrame(ohlcv_data, columns=columns)
df.set_index("Timestamp", inplace=True)
df.index = pd.to_datetime(df.index, unit="ms")

# Add meta info
df["exchange"] = exchange.name.lower()
df["symbol"] = symbol
if verbose:
print(
f"Finished downloading {timeframe} candles for {symbol} on {exchange}."
tasks : list[coroutine], optional
A list of coroutine tasks to append to. Used internally.
"""
# Iterate through date range
current_dt = start_dt
while current_dt < end_dt:
# Download data for this chunk
coro = _trades_helper(
exchange=exchange,
symbol=symbol,
start_dt=current_dt,
rate_limiter=rate_limiter,
download_dir=download_dir,
verbose=verbose,
)
if tasks is not None:
# Append to tasks list
tasks.append(coro)
else:
# Await and return immediately
return await coro

return df
# Walk forwards in time
current_dt = current_dt + timedelta(days=1)

if tasks is not None:
return tasks

async def trades(

async def _trades_helper(
exchange: ccxt.Exchange,
symbol: str,
start_dt: datetime,
rate_limiter: AsyncLimiter,
download_dir: str = DEFAULT_DOWNLOAD_DIR,
verbose: Optional[bool] = True,
) -> pd.DataFrame:
"""Download trade data.
Parameters
----------
exchange : str | ccxt_pro.Exchange
The exchange to download data from, provided either by
name as a string, or directly as a CCXT Pro exchange
instance.
symbol : str
The symbol to download data for.
start_dt : datetime
The start date of the data to download, provided as a
datetime object.
end_dt : datetime
The end date of the data to download, provided as a
datetime object.
rate_limiter : asiolimiter.AsyncLimiter
An asyncio rate limiter object.
download_dir : str, optional
The path to the download directory. The default is
'.ccxt_data/' in your user's home directory.
verbose : bool, optional
Be verbose. The default is True.
"""
filename = filename_builder(
exchange=exchange.name.lower(),
start_dt=start_dt,
Expand Down Expand Up @@ -511,9 +525,11 @@ async def funding(
exchange: ccxt.Exchange,
symbol: str,
start_dt: datetime,
end_dt: datetime,
rate_limiter: AsyncLimiter,
download_dir: str = DEFAULT_DOWNLOAD_DIR,
verbose: Optional[bool] = True,
tasks: Optional[list[Coroutine]] = None,
) -> pd.DataFrame:
"""Download funding rate data.
Expand Down Expand Up @@ -544,7 +560,45 @@ async def funding(
verbose : bool, optional
Be verbose. The default is True.
tasks : list[coroutine], optional
A list of coroutine tasks to append to. Used internally.
"""
# TODO - download over larger date ranges - see candles function for reference
# Iterate through date range
current_dt = start_dt
while current_dt < end_dt:
# Download data for this chunk
coro = _funding_helper(
exchange=exchange,
symbol=symbol,
start_dt=current_dt,
rate_limiter=rate_limiter,
download_dir=download_dir,
verbose=verbose,
)
if tasks is not None:
# Append to tasks list
tasks.append(coro)
else:
# Await and return immediately
return await coro

# Walk forwards in time
current_dt = current_dt + timedelta(days=1)

if tasks is not None:
return tasks


async def _funding_helper(
exchange: ccxt.Exchange,
symbol: str,
start_dt: datetime,
rate_limiter: AsyncLimiter,
download_dir: str = DEFAULT_DOWNLOAD_DIR,
verbose: Optional[bool] = True,
) -> pd.DataFrame:
filename = filename_builder(
exchange=exchange.name.lower(),
start_dt=start_dt,
Expand Down
Loading

0 comments on commit beefef8

Please sign in to comment.