diff --git a/alluxiofs/__init__.py b/alluxiofs/__init__.py index d025ec9..72a57da 100644 --- a/alluxiofs/__init__.py +++ b/alluxiofs/__init__.py @@ -1,4 +1,5 @@ from alluxiofs.client import AlluxioClient from alluxiofs.core import AlluxioFileSystem +from alluxiofs.core import setup_logger -__all__ = ["AlluxioFileSystem", "AlluxioClient"] +__all__ = ["AlluxioFileSystem", "AlluxioClient", "setup_logger"] diff --git a/alluxiofs/client/const.py b/alluxiofs/client/const.py index 6d739ff..8cfced6 100644 --- a/alluxiofs/client/const.py +++ b/alluxiofs/client/const.py @@ -55,3 +55,6 @@ "http://{worker_host}:{http_port}/v1/load?path={path}&opType=stop" ) ETCD_PREFIX_FORMAT = "/ServiceDiscovery/{cluster_name}/" +EXCEPTION_CONTENT = ( + "Worker's address: {worker_host}:{http_port}, Error: {error}" +) diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py index 5aa6804..6e0cfc4 100644 --- a/alluxiofs/client/core.py +++ b/alluxiofs/client/core.py @@ -46,6 +46,7 @@ FULL_CHUNK_URL_FORMAT, WRITE_CHUNK_URL_FORMAT, FULL_RANGE_URL_FORMAT, + EXCEPTION_CONTENT, ) from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE from .const import ALLUXIO_COMMON_EXTENSION_ENABLE @@ -62,8 +63,6 @@ from .const import WRITE_PAGE_URL_FORMAT from .worker_ring import ConsistentHashProvider -logger = logging.getLogger(__name__) - @dataclass class AlluxioPathStatus: @@ -131,6 +130,7 @@ class AlluxioClient: def __init__( self, + logger=logging.getLogger(__name__), **kwargs, ): """ @@ -160,8 +160,9 @@ def __init__( self.session = self._create_session(self.config.concurrency) self.hash_provider = ConsistentHashProvider(self.config) self.data_manager = None + self.logger = logger if kwargs.get(ALLUXIO_COMMON_EXTENSION_ENABLE, False): - logger.info("alluxiocommon extension enabled.") + self.logger.info("alluxiocommon extension enabled.") self.data_manager = _DataManager( self.config.concurrency, ondemand_pool_disabled=kwargs.get( @@ -170,7 +171,7 @@ def __init__( ) test_options = kwargs.get("test_options", {}) - set_log_level(logger, test_options) + set_log_level(self.logger, test_options) def listdir(self, path): """ @@ -238,10 +239,14 @@ def listdir(self, path): ) ) return result - except Exception as e: + except Exception: raise Exception( - f"Error when listing path {path}: error {e}" - ) from e + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def get_file_status(self, path): """ @@ -300,10 +305,14 @@ def get_file_status(self, path): data["mLength"], data["mContentHash"].strip('"'), ) - except Exception as e: + except Exception: raise Exception( - f"Error when getting file status path {path}: error {e}" - ) from e + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def load( self, @@ -365,10 +374,15 @@ def submit_load( response.raise_for_status() content = json.loads(response.content.decode("utf-8")) return content[ALLUXIO_SUCCESS_IDENTIFIER] - except Exception as e: + except Exception: raise Exception( - f"Error when submitting load job for path {path} from {worker_host}: error {e}" - ) from e + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error when submitting load job for path {path}, " + + response.content.decode("utf-8"), + ) + ) def stop_load( self, @@ -399,10 +413,15 @@ def stop_load( response.raise_for_status() content = json.loads(response.content.decode("utf-8")) return content[ALLUXIO_SUCCESS_IDENTIFIER] - except Exception as e: + except Exception: raise Exception( - f"Error when stopping load job for path {path} from {worker_host}: error {e}" - ) from e + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error when stopping load job for path {path}, " + + response.content.decode("utf-8"), + ) + ) def load_progress( self, @@ -459,9 +478,7 @@ def read(self, file_path): raise FileNotFoundError(f"File {file_path} not found") return self.read_range(file_path, 0, file_status.length) except Exception as e: - raise Exception( - f"Error when reading file {file_path}: error {e}" - ) from e + raise Exception(e) def read_file_range(self, file_path, offset=0, length=-1): """ @@ -496,8 +513,12 @@ def read_file_range(self, file_path, offset=0, length=-1): ) except Exception as e: raise Exception( - f"Error when reading file {file_path}: error {e}" - ) from e + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error when reading file {file_path}, " + e, + ) + ) def read_chunked(self, file_path, chunk_size=1024 * 1024): """ @@ -529,9 +550,7 @@ def read_chunked(self, file_path, chunk_size=1024 * 1024): chunk_size, ) except Exception as e: - raise Exception( - f"Error when reading file {file_path}: error {e}" - ) from e + raise Exception(e) def _all_chunk_generator( self, worker_host, worker_http_port, path_id, file_path, chunk_size @@ -558,13 +577,25 @@ def _all_chunk_generator( ) out = io.BytesIO() headers = {"transfer-type": "chunked"} - with requests.get(url_chunk, headers=headers, stream=True) as response: - response.raise_for_status() - for chunk in response.iter_content(chunk_size=chunk_size): - if chunk: - out.write(chunk) - out.seek(0) - return out + try: + with requests.get( + url_chunk, headers=headers, stream=True + ) as response: + response.raise_for_status() + for chunk in response.iter_content(chunk_size=chunk_size): + if chunk: + out.write(chunk) + out.seek(0) + return out + except Exception: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error when reading file {file_path}, " + + response.content.decode("utf-8"), + ) + ) def read_range(self, file_path, offset, length): """ @@ -578,7 +609,7 @@ def read_range(self, file_path, offset, length): Returns: file content (str): The file content with length from offset """ - logger.debug(f"read_range,off:{offset}:length:{length}") + self.logger.debug(f"read_range,off:{offset}:length:{length}") self._validate_path(file_path) if not isinstance(offset, int) or offset < 0: raise ValueError("Offset must be a non-negative integer") @@ -624,10 +655,7 @@ def read_range(self, file_path, offset, length): ) ) except Exception as e: - raise Exception( - f"Error when reading file:{file_path}: error:{e}: " - f"worker_host:{worker_host}, worker_http_port:{worker_http_port}" - ) from e + raise Exception(e) def write(self, file_path, file_bytes): """ @@ -660,7 +688,7 @@ def write(self, file_path, file_bytes): ) except Exception as e: raise Exception( - f"Error when reading file {file_path}: error {e}" + f"Error when writing file {file_path}: error {e}" ) from e def write_chunked(self, file_path, file_bytes, chunk_size=1024 * 1024): @@ -693,9 +721,7 @@ def write_chunked(self, file_path, file_bytes, chunk_size=1024 * 1024): chunk_size, ) except Exception as e: - raise Exception( - f"Error when reading file {file_path}: error {e}" - ) from e + raise e def write_page(self, file_path, page_index, page_bytes): """ @@ -728,9 +754,14 @@ def write_page(self, file_path, page_index, page_bytes): ) response.raise_for_status() return 200 <= response.status_code < 300 - except requests.RequestException as e: + except requests.RequestException: raise Exception( - f"Error writing to file {file_path} at page {page_index}: {e}" + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error when write file {file_path}, " + + response.content.decode("utf-8"), + ) ) def mkdir(self, file_path): @@ -741,6 +772,7 @@ def mkdir(self, file_path): Returns: True if the mkdir was successful, False otherwise. """ + self._validate_path(file_path) worker_host, worker_http_port = self._get_preferred_worker_address( file_path @@ -757,8 +789,14 @@ def mkdir(self, file_path): ) response.raise_for_status() return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception(f"Error making a directory of {file_path}: {e}") + except requests.RequestException: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def touch(self, file_path): """ @@ -768,6 +806,7 @@ def touch(self, file_path): Returns: True if the touch was successful, False otherwise. """ + self._validate_path(file_path) worker_host, worker_http_port = self._get_preferred_worker_address( file_path @@ -784,11 +823,16 @@ def touch(self, file_path): ) response.raise_for_status() return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception(f"Error create a file of {file_path}: {e}") - - # TODO(littelEast7): complete it + except requests.RequestException: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) + # TODO(littelEast7): review it def mv(self, path1, path2): """ mv a file from path1 to path2. @@ -798,6 +842,7 @@ def mv(self, path1, path2): Returns: True if the mv was successful, False otherwise. """ + self._validate_path(path1) self._validate_path(path2) worker_host, worker_http_port = self._get_preferred_worker_address( @@ -816,8 +861,14 @@ def mv(self, path1, path2): ) response.raise_for_status() return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception(f"Error move a file from {path1} to {path2}: {e}") + except requests.RequestException: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def rm(self, path, option): """ @@ -828,6 +879,7 @@ def rm(self, path, option): Returns: True if the rm was successful, False otherwise. """ + self._validate_path(path) worker_host, worker_http_port = self._get_preferred_worker_address( path @@ -846,8 +898,14 @@ def rm(self, path, option): ) response.raise_for_status() return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception(f"Error remove a file {path}: {e}") + except requests.RequestException: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def cp(self, path1, path2, option): """ @@ -859,6 +917,7 @@ def cp(self, path1, path2, option): Returns: True if the cp was successful, False otherwise. """ + self._validate_path(path1) worker_host, worker_http_port = self._get_preferred_worker_address( path1 @@ -878,8 +937,14 @@ def cp(self, path1, path2, option): ) response.raise_for_status() return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception(f"Error copy a file from {path1} to {path2}: {e}") + except requests.RequestException: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def tail(self, file_path, num_of_bytes=None): """ @@ -890,6 +955,7 @@ def tail(self, file_path, num_of_bytes=None): Returns: The content of tail of the file. """ + self._validate_path(file_path) worker_host, worker_http_port = self._get_preferred_worker_address( file_path @@ -905,9 +971,16 @@ def tail(self, file_path, num_of_bytes=None): ), params={"numOfBytes": num_of_bytes}, ) + response.raise_for_status() return b"".join(response.iter_content()) - except requests.RequestException as e: - raise Exception(f"Error show the tail of {file_path}: {e}") + except requests.RequestException: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def head(self, file_path, num_of_bytes=None): """ @@ -933,9 +1006,16 @@ def head(self, file_path, num_of_bytes=None): ), params={"numOfBytes": num_of_bytes}, ) + response.raise_for_status() return b"".join(response.iter_content()) - except requests.RequestException as e: - raise Exception(f"Error show the head of {file_path}: {e}") + except requests.RequestException: + raise Exception( + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def _all_page_generator_alluxiocommon( self, worker_host, worker_http_port, path_id, file_path @@ -967,9 +1047,7 @@ def _all_page_generator_alluxiocommon( except Exception as e: # data_manager won't throw exception if there are any first few content retrieved # hence we always propagte exception from data_manager upwards - raise Exception( - f"Error when reading all pages of {path_id}: error {e}" - ) from e + raise Exception(e) def _all_page_generator( self, worker_host, worker_http_port, path_id, file_path @@ -1025,9 +1103,7 @@ def _all_page_generator_write( except Exception as e: # data_manager won't throw exception if there are any first few content retrieved # hence we always propagte exception from data_manager upwards - raise Exception( - f"Error when writing all pages of {path_id}: error {e}" - ) from e + raise Exception(e) def _file_chunk_generator(self, file_bytes, chunk_size): offset = 0 @@ -1065,13 +1141,16 @@ def _all_chunk_generator_write( headers=headers, data=self._file_chunk_generator(file_bytes, chunk_size), ) - return response.status_code == 200 - except Exception as e: - # data_manager won't throw exception if there are any first few content retrieved - # hence we always propagte exception from data_manager upwards + response.raise_for_status() + return 200 <= response.status_code < 300 + except Exception: raise Exception( - f"Error when writing all pages of {file_path}: error {e}" - ) from e + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=response.content.decode("utf-8"), + ) + ) def _range_page_generator_alluxiocommon( self, worker_host, worker_http_port, path_id, file_path, offset, length @@ -1175,10 +1254,15 @@ def _all_file_range_generator( response = requests.get(url) response.raise_for_status() return response.content - except Exception as e: + except Exception: raise Exception( - f"Error when reading file {path_id} with offset {offset} and length {length}: error {e}" - ) from e + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error when reading file {path_id} with offset {offset} and length {length}," + f" error: {response.content.decode('utf-8')}", + ) + ) def _create_session(self, concurrency): session = requests.Session() @@ -1228,26 +1312,29 @@ def _load_file( if job_state == LoadState.SUCCEEDED: return True if job_state == LoadState.FAILED: - logger.error( + self.logger.error( f"Failed to load path {path} with return message {content}" ) return False if job_state == LoadState.STOPPED: - logger.warning( + self.logger.warning( f"Failed to load path {path} with return message {content}, load stopped" ) return False if timeout is None or stop_time - time.time() >= 10: time.sleep(10) else: - logger.debug(f"Failed to load path {path} within timeout") + self.logger.debug( + f"Failed to load path {path} within timeout" + ) return False - except Exception as e: - logger.debug( - f"Error when loading file {path} from {worker_host} with timeout {timeout}: error {e}" + except Exception: + self.logger.error( + f"Error when loading file {path} from {worker_host} with timeout {timeout}:" + f" error {response.content.decode('utf-8')}" ) - return False + raise Exception(response.content.decode("utf-8")) def _load_progress_internal( self, load_url: str, params: Dict @@ -1293,7 +1380,7 @@ def _read_page( file_path=file_path, page_index=page_index, ) - logger.debug(f"Reading full page request {page_url}") + self.logger.debug(f"Reading full page request {page_url}") else: page_url = PAGE_URL_FORMAT.format( worker_host=worker_host, @@ -1304,15 +1391,17 @@ def _read_page( page_offset=offset, page_length=length, ) - logger.debug(f"Reading page request {page_url}") + self.logger.debug(f"Reading page request {page_url}") response = self.session.get(page_url) response.raise_for_status() return response.content - except Exception as e: - raise Exception( - f"Error when requesting file {path_id} page {page_index} from {worker_host}: error {e}" - ) from e + except Exception: + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error when requesting file {path_id} page {page_index}, {response.content.decode('utf-8')}", + ) def _write_page( self, @@ -1346,9 +1435,13 @@ def _write_page( ) response.raise_for_status() return 200 <= response.status_code < 300 - except requests.RequestException as e: + except requests.RequestException: raise Exception( - f"Error writing to file {file_path} at page {page_index}: {e}" + EXCEPTION_CONTENT.format( + worker_host=worker_host, + http_port=worker_http_port, + error=f"Error writing to file {file_path} at page {page_index}, {response.content.decode('utf-8')}", + ) ) def _get_path_hash(self, uri): @@ -1450,7 +1543,7 @@ def __init__( if options: if ALLUXIO_PAGE_SIZE_KEY in options: page_size = options[ALLUXIO_PAGE_SIZE_KEY] - logger.debug(f"Page size is set to {page_size}") + self.logger.debug(f"Page size is set to {page_size}") self.page_size = humanfriendly.parse_size(page_size, binary=True) self.hash_provider = ConsistentHashProvider( AlluxioClientConfig( @@ -1771,19 +1864,19 @@ async def _load_file(self, worker_host: str, path: str, timeout): if job_state == LoadState.SUCCEEDED: return True if job_state == LoadState.FAILED: - logger.debug( + self.logger.debug( f"Failed to load path {path} with return message {content}" ) return False if job_state == LoadState.STOPPED: - logger.debug( + self.logger.debug( f"Failed to load path {path} with return message {content}, load stopped" ) return False if timeout is None or stop_time - time.time() >= 10: asyncio.sleep(10) else: - logger.debug(f"Failed to load path {path} within timeout") + self.logger.debug(f"Failed to load path {path} within timeout") return False async def _load_progress_internal(self, load_url: str): diff --git a/alluxiofs/core.py b/alluxiofs/core.py index 07a8301..04b12de 100644 --- a/alluxiofs/core.py +++ b/alluxiofs/core.py @@ -21,7 +21,36 @@ from alluxiofs.client import AlluxioClient from alluxiofs.client.utils import set_log_level -logger = logging.getLogger(__name__) + +def setup_logger(file_path=None, level=logging.INFO): + import os + + # log dir + file_name = "user.log" + if file_path is None: + project_dir = os.getcwd() + logs_dir = os.path.join(project_dir, "logs") + if not os.path.exists(logs_dir): + os.makedirs(logs_dir) + log_file = os.path.join(logs_dir, file_name) + else: + log_file = file_path + "/" + file_name + # set handler + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(level) + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + # init logger + logger = logging.getLogger(__name__) + logger.addHandler(file_handler) + logger.addHandler(console_handler) + logger.setLevel(level) + return logger @dataclass @@ -82,6 +111,7 @@ def __init__( target_protocol=None, target_options=None, fs=None, + logger=None, **kwargs, ): """ @@ -116,13 +146,16 @@ def __init__( **kwargs: other parameters for core session. """ super().__init__(**kwargs) + self.logger = logger + if self.logger is None: + self.logger = setup_logger() if fs and target_protocol: raise ValueError( "Please provide one of filesystem instance (fs) or" " target_protocol, not both" ) if fs is None and target_protocol is None: - logger.warning( + self.logger.warning( "Neither filesystem instance(fs) nor target_protocol is " "provided. Will not fall back to under file systems when " "accessed files are not in Alluxiofs" @@ -146,11 +179,12 @@ def __init__( self.fs = filesystem(target_protocol, **self.target_options) self.target_protocol = target_protocol test_options = kwargs.get("test_options", {}) - set_log_level(logger, test_options) + set_log_level(self.logger, test_options) if test_options.get("skip_alluxio") is True: self.alluxio = None else: self.alluxio = AlluxioClient( + logger=self.logger, **kwargs, ) if preload_path is not None: @@ -273,15 +307,24 @@ def fallback_wrapper(self, *args, **kwargs): if self.alluxio: start_time = time.time() res = alluxio_impl(self, *positional_params, **kwargs) - logger.debug( + self.logger.debug( f"Exit(Ok): alluxio op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs}) time({(time.time() - start_time):.2f}s)" ) return res except Exception as e: if not isinstance(e, NotImplementedError): - logger.debug( - f"Exit(Error): alluxio op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs}) {e}" - ) + if alluxio_impl.__name__ not in [ + "write", + "upload", + "upload_data", + ]: + self.logger.error( + f"Exit(Error): alluxio op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs}): {e}\nfallback to ufs" + ) + else: + self.logger.error( + f"Exit(Error): alluxio op({alluxio_impl.__name__}) {e}\nfallback to ufs" + ) self.error_metrics.record_error(alluxio_impl.__name__, e) if self.fs is None: raise e @@ -289,12 +332,15 @@ def fallback_wrapper(self, *args, **kwargs): fs_method = getattr(self.fs, alluxio_impl.__name__, None) if fs_method: - res = fs_method(*positional_params, **kwargs) - - logger.debug( - f"Exit(Ok): ufs({self.target_protocol}) op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs})" - ) - return res + try: + res = fs_method(*positional_params, **kwargs) + self.logger.debug( + f"Exit(Ok): ufs({self.target_protocol}) op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs})" + ) + return res + except Exception as e: + self.logger.error(f"fallback to ufs is failed: {e}") + raise Exception(f"fallback to ufs is failed: {e}") raise NotImplementedError( f"The method {alluxio_impl.__name__} is not implemented in the underlying filesystem {self.target_protocol}" ) @@ -316,6 +362,15 @@ def info(self, path, **kwargs): file_status = self.alluxio.get_file_status(path) return self._translate_alluxio_info_to_fsspec_info(file_status, True) + @fallback_handler + def exists(self, path, **kwargs): + try: + path = self.unstrip_protocol(path) + self.alluxio.get_file_status(path) + return True + except Exception: + return False + @fallback_handler def isdir(self, path, **kwargs): return self.info(path)["type"] == "directory" @@ -478,12 +533,15 @@ def put_file(self, lpath, rpath, *args, **kwargs): def put(self, lpath, rpath, *args, **kwargs): raise NotImplementedError + @fallback_handler def write(self, path, value, **kwargs): return self.upload_data(path, value, **kwargs) + @fallback_handler def read(self, path, *args, **kwargs): return self.cat_file(path) + @fallback_handler def load_file_from_ufs_to_alluxio(self, path, **kwargs): path = self.unstrip_protocol(path) return self.alluxio.load(path, **kwargs) @@ -491,31 +549,21 @@ def load_file_from_ufs_to_alluxio(self, path, **kwargs): @fallback_handler def upload(self, lpath: str, rpath: str, *args, **kwargs) -> bool: lpath = self.unstrip_protocol(lpath) - try: - with open(rpath, "rb") as f: - self.alluxio.write_chunked(lpath, f.read()) - return True - except Exception: - return False + with open(rpath, "rb") as f: + return self.alluxio.write_chunked(lpath, f.read()) + @fallback_handler def upload_data(self, path: str, data: b"", *args, **kwargs) -> bool: path = self.unstrip_protocol(path) - try: - self.alluxio.write_chunked(path, data) - return True - except Exception: - return False + return self.alluxio.write_chunked(path, data) @fallback_handler def download(self, lpath, rpath, *args, **kwargs): lpath = self.unstrip_protocol(lpath) - try: - with open(rpath, "wb") as f: - f.write(self.alluxio.read_chunked(lpath).read()) - return True - except Exception: - return False + with open(rpath, "wb") as f: + return f.write(self.alluxio.read_chunked(lpath).read()) + @fallback_handler def download_data(self, lpath, *args, **kwargs): lpath = self.unstrip_protocol(lpath) return self.alluxio.read_chunked(lpath) diff --git a/tests/no_mock_tests/other_option_test_disabled.py b/tests/no_mock_tests/other_option_test_disabled.py index 3418400..36a6ffe 100644 --- a/tests/no_mock_tests/other_option_test_disabled.py +++ b/tests/no_mock_tests/other_option_test_disabled.py @@ -39,9 +39,6 @@ def other_option_test_disabled(): if alluxio_fs.exists(home_path): alluxio_fs.rm(home_path, recursive=True) - # # load file from ufs to alluxio - assert alluxio_fs.load_file_from_ufs_to_alluxio(path="s3://" + bucket_name) - # # mkdir res = alluxio_fs.mkdir(home_path) assert res @@ -57,6 +54,9 @@ def other_option_test_disabled(): verify_result(1) show_files(home_path) + # # load file from ufs to alluxio + # assert alluxio_fs.load_file_from_ufs_to_alluxio(home_path) + # # get file status res_folder = alluxio_fs.info(home_path) assert res_folder and res_folder["type"] == "directory" @@ -88,10 +88,12 @@ def other_option_test_disabled(): verify_result(1) # # upload + print("upload file for test") with open("../assets/test.csv", "rb") as f: data = f.read() - alluxio_fs.upload_data( - path=home_path + "/python_sdk_test_folder/file3", data=data + assert alluxio_fs.upload_data( + path="4564" + home_path + "/python_sdk_test_folder/file3", + data=data, ) # # move