Skip to content

Commit

Permalink
Complete refactor of the registry system to allow for clean separatio…
Browse files Browse the repository at this point in the history
…n of importers, exporters, and other objects (remove specific 'reader' verbiage)
  • Loading branch information
krzywon committed Feb 16, 2024
1 parent 4f688a5 commit 39c88ed
Showing 1 changed file with 74 additions and 77 deletions.
151 changes: 74 additions & 77 deletions sasdata/data_io/registry.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,32 @@
"""
File extension registry.
This provides routines for opening files based on extension,
and registers the built-in file extensions.
This provides routines for opening files based on extension, and registers the built-in file extensions.
"""
import os
from typing import Optional, List, Union, TYPE_CHECKING
from typing import Any, Optional, List, Union
from collections import defaultdict
from pathlib import Path
import logging
from io import BytesIO, StringIO, FileIO

from sasdata.data_util.loader_exceptions import NoKnownLoaderException
from sasdata.data_util.util import unique_preserve_order
from sasdata.dataloader import readers as all_readers
from sasdata.data_io.open import CustomFileOpen

# TYPE_CHECKING hides imports at runtime: https://docs.python.org/3/library/typing.html#typing.TYPE_CHECKING
# Imports used here are only used for type checking, but would create a circular import, otherwise.
if TYPE_CHECKING:
from sasdata.dataloader.data_info import Data1D, Data2D
# Class-specific types
PATH_LIKE = str, Path, os.path
FILE_LIKE = PATH_LIKE, BytesIO, StringIO, FileIO
OPEN_LIKE = Union[list[FILE_LIKE], FILE_LIKE]

DEPRECATION_MESSAGE = ("\rThe extension, {}, of the file, {}, suggests the data set might not be fully reduced. Support"
" for the reader associated with this file type has been removed. An attempt to load the file "
"was made, but, should it be successful, SasView cannot guarantee the accuracy of the data.")
DEPRECATION_MESSAGE = ("\rSupport for module {}, associated with the file extension {} has been removed. An attempt to "
"load the file {} was made. Should it be successful, the results cannot be guaranteed.")


def create_empty_data_with_errors(path: Union[str, Path], errors: List[Exception]):
"""Create a Data1D instance that only holds errors and a filepath. This allows all file paths to return a common
data type, regardless if the data loading was successful or a failure."""
from sasdata.dataloader.data_info import Data1D
data_object = Data1D()
data_object.errors = errors
data_object.filename = path
return [data_object]
logger = logging.getLogger(__name__)


class ExtensionRegistry:
"""
Associate a file loader with an extension.
Note that there may be multiple loaders for the same extension.
"""Associate a module with a file extension. Note that there may be multiple modules for the same extension.
Example: ::
@@ -47,51 +35,56 @@ class ExtensionRegistry:
# Add an association by setting an element
registry['.zip'] = unzip
# Multiple extensions for one loader
# Multiple extensions for one module
registry['.tgz'] = untar
registry['.tar.gz'] = untar
# Generic extensions to use after trying more specific extensions;
# these will be checked after the more specific extensions fail.
registry['.gz'] = gunzip
# Multiple loaders for one extension
# Multiple modules for one extension
registry['.cx'] = cx1
registry['.cx'] = cx2
registry['.cx'] = cx3
# Show registered extensions
print registry.extensions()
print(registry.extensions())
# Can also register a format name for explicit control from caller
registry['cx3'] = cx3
# Register a format name for explicit control from caller
registry['CX Number 3'] = cx3
print registry.formats()
# Retrieve loaders for a file name
# Retrieve modules for a file name
registry.lookup('hello.cx') -> [cx3,cx2,cx1]
# Run loader on a filename
registry.load('hello.cx') ->
try:
return cx3('hello.cx')
except:
# Run module on a filename
registry.call_fcn('hello.cx') ->
for module in registry.lookup('hello.cx'):
try:
return cx2('hello.cx')
return module('hello.cx')
except:
return cx1('hello.cx')
pass
# Load in a specific format ignoring extension
registry.load('hello.cx',format='cx3') ->
return cx3('hello.cx')
# Use a specific format ignoring extension
registry.call_fcn('hello.cx', format='CX Number 3') -> return cx3('hello.cx')
"""

# A dictionary of previously allowed extensions and formats that are no longer supported mapped to
# a string outlining the reason for deprecation.
_deprecated_extensions = {}

# A list of modules that should be used as backup modules, should all registered modules fail
_fall_back_modules = []

# File open mode for the registry object. Defaults to binary reading
_mode = 'rb'

def __init__(self):
self.modules = defaultdict(list)

# Deprecated extensions
self.deprecated_extensions = ['.asc']

def __setitem__(self, ext: str, loader):
self.modules[ext].insert(0, loader)
def __setitem__(self, fmt: str, module):
self.modules[fmt].insert(0, module)

def __getitem__(self, ext: str) -> List:
return self.modules[ext]
@@ -115,52 +108,56 @@ def extensions(self) -> List[str]:
exts.sort()
return exts

def lookup(self, path: str) -> List[callable]:
def lookup(self, path: PATH_LIKE) -> List[callable]:
"""
Return the loader associated with the file type of path.
Return all modules associated with the file type of path.
:param path: Data file path
:return: List of available readers for the file extension (maybe empty)
:return: List of available modules based on the file extension (maybe empty)
"""
# Find matching lower-case extensions
path_lower = path.lower()
extensions = [ext for ext in self.extensions() if path_lower.endswith(ext)]
# Sort matching extensions by decreasing order of length
extensions.sort(key=len)
path = Path(path)
path_lower = str(path.absolute()).lower()
# Allow multi extension files e.g.: `.tar.gz` files should match both `.tar.gz` and `.gz`
# Combine readers for matching extensions into one big list
readers = [reader for ext in extensions for reader in self.modules[ext]]
modules = [module for ext in self.extensions() for module in self.modules[ext] if path_lower.endswith(ext)]
# include generic readers in list of available readers to ensure error handling works properly
readers.extend(all_readers.get_fallback_readers())
modules.extend(self._fall_back_modules)
# Ensure the list of readers only includes unique values and the order is maintained
return unique_preserve_order(readers)
return unique_preserve_order(modules)

def load(self, path: str, ext: Optional[str] = None) -> List[Union["Data1D", "Data2D"]]:
def call_fcn(self, path: str, ext: Optional[str] = None) -> Any:
"""
Call the loader for a single file.
Call the registry for a single file.
Exceptions are stored in Data1D instances, with the errors in Data1D.errors
:param path: Data file path
:param ext: Data file extension (optional)
:return: Information relevant to the registry. This should be defined in subclasses.
"""
if ext is None:
loaders = self.lookup(path)
_, ext = os.path.splitext(path)
if not loaders:
raise NoKnownLoaderException("No loaders match extension in %r"
% path)
# Empty string extensions are valid extensions and should *not* fall into this code branch.
modules = self.lookup(path)
ext = Path(path).suffix
else:
loaders = self.modules.get(ext.lower(), [])
if not loaders:
raise NoKnownLoaderException("No loaders match format %r"
% ext)
errors = []
with CustomFileOpen(path, 'rb') as file_handler:
for load_function in loaders:
modules = self.modules.get(ext.lower(), [])
# No registered modules found for this particular file extension
if not modules:
raise NoKnownLoaderException("No loaders match format %r" % ext)

# A mapping of module name to errors thrown
errors = {}
with CustomFileOpen(path, self._mode) as file_handler:
for module in modules:
try:
loaded_data = load_function(path, file_handler)
results = module(path, file_handler)
# Check if the file read support is deprecated
if ext.lower() in self.deprecated_extensions:
loaded_data[0].errors.append(DEPRECATION_MESSAGE.format(ext, path))
return loaded_data
if ext.lower() in self._deprecated_extensions:
logger.warning(DEPRECATION_MESSAGE.format(ext, path))
errors[module.__name__] = DEPRECATION_MESSAGE.format(module._name__, ext, path)
return results
except Exception as e:
errors.append(e)
# If we get here it is because all loaders failed -> return Data1D with only file path and errors
return create_empty_data_with_errors(path, errors)
logger.warning(f"The module {module.__name__} was unable to process the contents of {path}.")

errors[module.__name__] = e
# If we get here it is because all no modules were successful
raise NoKnownLoaderException(f"Sasdata is unable to manage contents 0f {path}.")

0 comments on commit 39c88ed

Please sign in to comment.