From 60ea1246fc517d3282a3be801bdee517707347af Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 31 Jan 2024 17:23:51 +0100 Subject: [PATCH 1/9] add CacheManager --- giza_datasets/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/giza_datasets/__init__.py b/giza_datasets/__init__.py index 933051c..61b446c 100644 --- a/giza_datasets/__init__.py +++ b/giza_datasets/__init__.py @@ -1,4 +1,5 @@ from giza_datasets.hub import DatasetsHub from giza_datasets.loaders import DatasetsLoader +from giza_datasets.cache_manager import CacheManager __all__ = ["DatasetsLoader", "DatasetsHub"] From 3ea3b54e834b97eb237f666d6c69b12192d3ad41 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 31 Jan 2024 17:26:45 +0100 Subject: [PATCH 2/9] added cache management and eager mode --- giza_datasets/loaders.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/giza_datasets/loaders.py b/giza_datasets/loaders.py index 5f660b1..0dfb004 100644 --- a/giza_datasets/loaders.py +++ b/giza_datasets/loaders.py @@ -1,7 +1,8 @@ import gcsfs import polars as pl - +import os from giza_datasets.constants import DATASET_HUB +from giza_datasets.cache_manager import CacheManager class DatasetsLoader: @@ -10,13 +11,12 @@ class DatasetsLoader: It uses the GCSFileSystem for accessing files and Polars for handling data. """ - def __init__(self): - """ - Initializes the DatasetsLoader with a GCS filesystem and the dataset configuration. - Verification is turned off for the GCS filesystem. - """ + def __init__(self, use_cache=True, cache_dir=None): self.fs = gcsfs.GCSFileSystem(verify=False) self.dataset_hub = DATASET_HUB + self.use_cache = use_cache + self.cache_dir = cache_dir if cache_dir is not None else os.path.join(os.path.expanduser("~"), "giza_datasets") + self.cache_manager = CacheManager(self.cache_dir) if use_cache else None def _get_all_parquet_files(self, directory): """ @@ -58,7 +58,7 @@ def _load_multiple_parquet_files(self, file_paths): return concatenated_df - def load(self, dataset_name): + def load(self, dataset_name, cache_dir = None, eager = False): """ Loads a dataset by name, either as a single file or multiple files. @@ -71,12 +71,23 @@ def load(self, dataset_name): Raises: ValueError: If the dataset name is not found or if no parquet files are found. """ + specific_cache_manager = None + if self.use_cache: + if cache_dir is not None: + specific_cache_manager = CacheManager(cache_dir) + cached_data = specific_cache_manager.load_from_cache(dataset_name, eager) + else: + cached_data = self.cache_manager.load_from_cache(dataset_name, eager) + if cached_data is not None: + return cached_data + gcs_path = None for dataset in self.dataset_hub: if dataset.name == dataset_name: gcs_path = dataset.path break - + if eager: + raise ValueError(f"Dataset '{dataset_name}' is not cached yet or you have use_cache=False. Eager mode is only available for cached datasets") if not gcs_path: raise ValueError(f"Dataset name '{dataset_name}' not found in Giza.") elif gcs_path.endswith(".parquet"): @@ -89,4 +100,8 @@ def load(self, dataset_name): "No .parquet files were found in the directory or subdirectories." ) df = self._load_multiple_parquet_files(parquet_files) + + if self.use_cache: + cache_manager_to_use = specific_cache_manager if specific_cache_manager is not None else self.cache_manager + cache_manager_to_use.save_to_cache(df, dataset_name) return df From 065ec9fc6f431aaf0cb0db1d32093b077201c61c Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 31 Jan 2024 17:28:07 +0100 Subject: [PATCH 3/9] add cache functionality in CacheManager class --- giza_datasets/cache_manager.py | 98 ++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 giza_datasets/cache_manager.py diff --git a/giza_datasets/cache_manager.py b/giza_datasets/cache_manager.py new file mode 100644 index 0000000..7e1408a --- /dev/null +++ b/giza_datasets/cache_manager.py @@ -0,0 +1,98 @@ +import polars as pl +import os +import hashlib + +import os +import hashlib +import polars as pl + +class CacheManager: + def __init__(self, cache_dir): + """ + Initializes the CacheManager with a specified cache directory. + + Args: + cache_dir (str): The directory path where cached datasets will be stored. + """ + self.cache_dir = cache_dir + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) + + def _generate_hash(self, parameters): + """ + Generates a hash based on the given parameters usefull to avoid collisions in the cache. + + Args: + parameters (dict): Parameters used to generate the hash. + + Returns: + str: A hash string derived from the parameters. + """ + param_str = str(parameters).encode('utf-8') + return hashlib.sha256(param_str).hexdigest() + + def set_cache_dir(self, cache_dir): + """ + Sets a new cache directory and creates the directory if it does not exist. + + Args: + cache_dir (str): The new directory path for caching datasets. + """ + self.cache_dir = cache_dir + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) + + def get_cache_path(self, parameters): + """ + Determines the full file path in the cache based on the given parameters. + + Args: + parameters (dict): Parameters used to identify the cached file. + + Returns: + str: The file path for the cached dataset. + """ + hash_key = self._generate_hash(parameters) + return os.path.join(self.cache_dir, f"{hash_key}.parquet") + + def load_from_cache(self, parameters, eager): + """ + Attempts to load a Polars DataFrame from a cached Parquet file. + If the file exists, it returns the DataFrame; otherwise, it returns None. + + Args: + parameters (dict): Parameters used to identify the cached dataset. + eager (bool): If True, loads the dataset in eager mode; otherwise, in lazy mode. + + Returns: + polars.DataFrame or None: The loaded DataFrame if cached, or None if not cached. + """ + cache_path = self.get_cache_path(parameters) + if os.path.exists(cache_path): + print("Dataset read from cache.") + if eager: + return pl.read_parquet(cache_path, use_pyarrow=True) + else: + return pl.scan_parquet(cache_path) + return None + + def save_to_cache(self, data, parameters): + """ + Saves a Polars DataFrame to a Parquet file in the cache. + + Args: + data (polars.DataFrame): The DataFrame to be cached. + parameters (dict): Parameters used to identify the dataset for caching. + """ + cache_path = self.get_cache_path(parameters) + data.write_parquet(cache_path) + + def clear_cache(self): + """ + Removes all files in the cache directory. + """ + for filename in os.listdir(self.cache_dir): + file_path = os.path.join(self.cache_dir, filename) + if os.path.isfile(file_path): + os.remove(file_path) + From 4a01d86c04b96d5f1b3af0024e7bfaa7d8ea1ce9 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 31 Jan 2024 17:33:25 +0100 Subject: [PATCH 4/9] fix minor errors --- giza_datasets/__init__.py | 2 +- giza_datasets/cache_manager.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/giza_datasets/__init__.py b/giza_datasets/__init__.py index 61b446c..dbf72fb 100644 --- a/giza_datasets/__init__.py +++ b/giza_datasets/__init__.py @@ -2,4 +2,4 @@ from giza_datasets.loaders import DatasetsLoader from giza_datasets.cache_manager import CacheManager -__all__ = ["DatasetsLoader", "DatasetsHub"] +__all__ = ["DatasetsLoader", "DatasetsHub", "CacheManager"] diff --git a/giza_datasets/cache_manager.py b/giza_datasets/cache_manager.py index 7e1408a..5154c21 100644 --- a/giza_datasets/cache_manager.py +++ b/giza_datasets/cache_manager.py @@ -2,10 +2,6 @@ import os import hashlib -import os -import hashlib -import polars as pl - class CacheManager: def __init__(self, cache_dir): """ From a90bd9514a63616bf263ea698909d94c99dc5957 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 20 Mar 2024 12:05:07 +0100 Subject: [PATCH 5/9] delete CacheManager --- giza_datasets/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/giza_datasets/__init__.py b/giza_datasets/__init__.py index dbf72fb..933051c 100644 --- a/giza_datasets/__init__.py +++ b/giza_datasets/__init__.py @@ -1,5 +1,4 @@ from giza_datasets.hub import DatasetsHub from giza_datasets.loaders import DatasetsLoader -from giza_datasets.cache_manager import CacheManager -__all__ = ["DatasetsLoader", "DatasetsHub", "CacheManager"] +__all__ = ["DatasetsLoader", "DatasetsHub"] From 4684731c178d1523589bed812aab5e32f8bb5b97 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 20 Mar 2024 12:06:02 +0100 Subject: [PATCH 6/9] delete hashing functionality --- giza_datasets/cache_manager.py | 64 ++++++++++++++-------------------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/giza_datasets/cache_manager.py b/giza_datasets/cache_manager.py index 5154c21..0c60a9a 100644 --- a/giza_datasets/cache_manager.py +++ b/giza_datasets/cache_manager.py @@ -1,12 +1,13 @@ import polars as pl +import pyarrow.dataset as ds import os -import hashlib + class CacheManager: def __init__(self, cache_dir): """ Initializes the CacheManager with a specified cache directory. - + Args: cache_dir (str): The directory path where cached datasets will be stored. """ @@ -14,81 +15,70 @@ def __init__(self, cache_dir): if not os.path.exists(cache_dir): os.makedirs(cache_dir) - def _generate_hash(self, parameters): - """ - Generates a hash based on the given parameters usefull to avoid collisions in the cache. - - Args: - parameters (dict): Parameters used to generate the hash. - - Returns: - str: A hash string derived from the parameters. - """ - param_str = str(parameters).encode('utf-8') - return hashlib.sha256(param_str).hexdigest() - def set_cache_dir(self, cache_dir): """ Sets a new cache directory and creates the directory if it does not exist. - + Args: cache_dir (str): The new directory path for caching datasets. """ self.cache_dir = cache_dir if not os.path.exists(cache_dir): os.makedirs(cache_dir) - - def get_cache_path(self, parameters): + + def get_cache_path(self, dataset_name): """ - Determines the full file path in the cache based on the given parameters. - + Determines the full file path in the cache based on the dataset name. + Args: - parameters (dict): Parameters used to identify the cached file. - + dataset_name (str): Name of the dataset to identify the cached file. + Returns: str: The file path for the cached dataset. """ - hash_key = self._generate_hash(parameters) - return os.path.join(self.cache_dir, f"{hash_key}.parquet") + return os.path.join(self.cache_dir, f"{dataset_name}") - def load_from_cache(self, parameters, eager): + def load_from_cache(self, dataset_name, eager): """ Attempts to load a Polars DataFrame from a cached Parquet file. If the file exists, it returns the DataFrame; otherwise, it returns None. - + Args: - parameters (dict): Parameters used to identify the cached dataset. + dataset_name (str): Name of the dataset to identify the cached file. eager (bool): If True, loads the dataset in eager mode; otherwise, in lazy mode. - + Returns: polars.DataFrame or None: The loaded DataFrame if cached, or None if not cached. """ - cache_path = self.get_cache_path(parameters) + cache_path = self.get_cache_path(dataset_name) if os.path.exists(cache_path): print("Dataset read from cache.") + myds = ds.dataset(cache_path) if eager: - return pl.read_parquet(cache_path, use_pyarrow=True) + return pl.scan_pyarrow_dataset(myds) else: - return pl.scan_parquet(cache_path) + return pl.from_arrow(myds.to_table()) return None - def save_to_cache(self, data, parameters): + def save_to_cache(self, data, dataset_name): """ Saves a Polars DataFrame to a Parquet file in the cache. - + Args: data (polars.DataFrame): The DataFrame to be cached. - parameters (dict): Parameters used to identify the dataset for caching. + dataset_name (str): Name of the dataset for caching. """ - cache_path = self.get_cache_path(parameters) + cache_path = self.get_cache_path(dataset_name) data.write_parquet(cache_path) def clear_cache(self): """ - Removes all files in the cache directory. + Removes all files in the cache directory and returns the count of deleted files. """ + deleted_files_count = 0 for filename in os.listdir(self.cache_dir): file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): os.remove(file_path) - + deleted_files_count += 1 + return deleted_files_count From a81dce49777d053cdfd703cf65fb3c9a34a2ecb1 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 20 Mar 2024 12:06:21 +0100 Subject: [PATCH 7/9] format --- giza_datasets/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/giza_datasets/constants.py b/giza_datasets/constants.py index 667e267..a742af2 100644 --- a/giza_datasets/constants.py +++ b/giza_datasets/constants.py @@ -197,14 +197,14 @@ tags=["DeFi", "Lending", "Aave-v3", "Ethereum", "Swap Fees"], documentation="https://datasets.gizatech.xyz/hub/aave/daily-exchange-rates-and-indexes-v3", ), - Dataset( + Dataset( name="aave-liquidationsV2", path="gs://datasets-giza/Aave/Aave_LiquidationsV2.parquet", description="Individual liquidations in Aave v2, including colleteral and lending values", tags=["DeFi", "Lending", "Aave-v2", "Ethereum", "Liquiditations"], documentation="https://datasets.gizatech.xyz/hub/aave/liquidations-v2", ), - Dataset( + Dataset( name="aave-liquidationsV3", path="gs://datasets-giza/Aave/Aave_LiquidationsV3.parquet", description="Individual liquidations in Aave v3, including colleteral and lending values", From f6daf678379d4693d5698924200a0f5ec9ccbd4f Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Wed, 20 Mar 2024 12:07:09 +0100 Subject: [PATCH 8/9] download to local before load in cache mode. Enable eager functionality from local --- giza_datasets/loaders.py | 132 +++++++++++++++++++++++++++------------ 1 file changed, 92 insertions(+), 40 deletions(-) diff --git a/giza_datasets/loaders.py b/giza_datasets/loaders.py index 0dfb004..1184169 100644 --- a/giza_datasets/loaders.py +++ b/giza_datasets/loaders.py @@ -15,7 +15,11 @@ def __init__(self, use_cache=True, cache_dir=None): self.fs = gcsfs.GCSFileSystem(verify=False) self.dataset_hub = DATASET_HUB self.use_cache = use_cache - self.cache_dir = cache_dir if cache_dir is not None else os.path.join(os.path.expanduser("~"), "giza_datasets") + self.cache_dir = ( + cache_dir + if cache_dir is not None + else os.path.join(os.path.expanduser("~"), "giza_datasets") + ) self.cache_manager = CacheManager(self.cache_dir) if use_cache else None def _get_all_parquet_files(self, directory): @@ -23,10 +27,10 @@ def _get_all_parquet_files(self, directory): Recursively retrieves all the parquet file paths in the given directory. Args: - directory: The GCS directory to search for parquet files. + directory (str): The GCS directory to search for parquet files. Returns: - A list of full paths to all the parquet files found. + List[str]: A list of full paths to all the parquet files found. """ all_files = self.fs.ls(directory, detail=True) parquet_files = [] @@ -44,10 +48,10 @@ def _load_multiple_parquet_files(self, file_paths): Loads multiple parquet files into a single Polars DataFrame. Args: - file_paths: A list of file paths to load. + file_paths (List[str]): A list of file paths to load. Returns: - A concatenated Polars DataFrame containing data from all files. + polars.DataFrame: A concatenated Polars DataFrame containing data from all files. """ dfs = [] for file_path in file_paths: @@ -55,53 +59,101 @@ def _load_multiple_parquet_files(self, file_paths): df = pl.read_parquet(f, use_pyarrow=True) dfs.append(df) concatenated_df = pl.concat(dfs, how="diagonal_relaxed") - return concatenated_df - def load(self, dataset_name, cache_dir = None, eager = False): + def _dataset_exists_in_gcs(self, dataset_name): """ - Loads a dataset by name, either as a single file or multiple files. + Checks if a dataset exists in GCS by looking for its path in the dataset hub. Args: - dataset_name: The name of the dataset to load. + dataset_name (str): The name of the dataset to check. Returns: - A Polars DataFrame containing the loaded dataset. - - Raises: - ValueError: If the dataset name is not found or if no parquet files are found. + str or None: The GCS path of the dataset if found, otherwise None. """ - specific_cache_manager = None - if self.use_cache: - if cache_dir is not None: - specific_cache_manager = CacheManager(cache_dir) - cached_data = specific_cache_manager.load_from_cache(dataset_name, eager) - else: - cached_data = self.cache_manager.load_from_cache(dataset_name, eager) - if cached_data is not None: - return cached_data - gcs_path = None for dataset in self.dataset_hub: if dataset.name == dataset_name: gcs_path = dataset.path break - if eager: - raise ValueError(f"Dataset '{dataset_name}' is not cached yet or you have use_cache=False. Eager mode is only available for cached datasets") - if not gcs_path: - raise ValueError(f"Dataset name '{dataset_name}' not found in Giza.") - elif gcs_path.endswith(".parquet"): - with self.fs.open(gcs_path) as f: - df = pl.read_parquet(f, use_pyarrow=True) - else: - parquet_files = self._get_all_parquet_files(gcs_path) - if not parquet_files: - raise ValueError( - "No .parquet files were found in the directory or subdirectories." - ) - df = self._load_multiple_parquet_files(parquet_files) - + return gcs_path + + def load(self, dataset_name, eager=False): + """ + Loads a dataset by name, either from cache or GCS. Supports eager and lazy loading. + + Args: + dataset_name (str): The name of the dataset to load. + eager (bool): If True, loads the dataset eagerly; otherwise, loads lazily. + + Returns: + polars.DataFrame: The loaded dataset as a Polars DataFrame. + + Raises: + ValueError: If the dataset cannot be found in cache or GCS, or if eager loading is requested for a non-cached dataset. + """ if self.use_cache: - cache_manager_to_use = specific_cache_manager if specific_cache_manager is not None else self.cache_manager - cache_manager_to_use.save_to_cache(df, dataset_name) + # Check if the dataset is already cached + cached_data = self.cache_manager.load_from_cache(dataset_name, eager) + if cached_data is not None: + print(f"Loading dataset {dataset_name} from cache.") + return cached_data + else: + print( + f"Dataset {dataset_name} not found in cache. Downloading from GCS." + ) + + gcs_path = self._dataset_exists_in_gcs(dataset_name) + + if not gcs_path: + raise ValueError(f"Dataset name '{dataset_name}' not found.") + + local_file_path = os.path.join(self.cache_dir, f"{dataset_name}") + + if gcs_path.endswith(".parquet"): + self.fs.get(gcs_path, local_file_path) + else: + self.fs.get(gcs_path, local_file_path, recursive=True) + + df = self.cache_manager.load_from_cache(dataset_name, eager) + else: + if eager: + raise ValueError("Eager mode is only available for cached datasets") + gcs_path = self._dataset_exists_in_gcs(dataset_name) + if not gcs_path: + raise ValueError(f"Dataset name '{dataset_name}' not found in Giza.") + elif gcs_path.endswith(".parquet"): + with self.fs.open(gcs_path) as f: + df = pl.read_parquet(f, use_pyarrow=True) + else: + parquet_files = self._get_all_parquet_files(gcs_path) + if not parquet_files: + raise ValueError( + "No .parquet files were found in the directory or subdirectories." + ) + df = self._load_multiple_parquet_files(parquet_files) + return df + + def set_cache_dir(self, new_cache_dir): + """ + Sets a new cache directory for the CacheManager and updates the instance cache directory. + + Args: + new_cache_dir (str): The new directory path for caching datasets. + """ + self.cache_dir = new_cache_dir + if self.use_cache: + self.cache_manager = CacheManager(self.cache_dir) + + def clear_cache(self): + """ + Removes all files in the cache directory through the CacheManager and prints the count. + """ + if self.use_cache and self.cache_manager: + deleted_files_count = self.cache_manager.clear_cache() + print( + f"{deleted_files_count} datasets have been cleared from the cache directory." + ) + else: + print("Cache management is not enabled or CacheManager is not initialized.") From b5ec7a39d355acbb0e11738f9431e4c9c5bccd25 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Thu, 21 Mar 2024 10:55:12 +0100 Subject: [PATCH 9/9] change the default route to ~/.cache/giza_datasets --- giza_datasets/loaders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/giza_datasets/loaders.py b/giza_datasets/loaders.py index 1184169..a80ba91 100644 --- a/giza_datasets/loaders.py +++ b/giza_datasets/loaders.py @@ -18,7 +18,7 @@ def __init__(self, use_cache=True, cache_dir=None): self.cache_dir = ( cache_dir if cache_dir is not None - else os.path.join(os.path.expanduser("~"), "giza_datasets") + else os.path.join(os.path.expanduser("~"), ".cache/giza_datasets") ) self.cache_manager = CacheManager(self.cache_dir) if use_cache else None