From b9d217facdc476f86dcf75893ebe502c346cfece Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Mon, 11 Mar 2024 14:10:04 -0700 Subject: [PATCH 1/2] Change Alluxio Configuration --- alluxiofs/client/config.py | 91 ++++++++++ alluxiofs/client/const.py | 6 - alluxiofs/client/core.py | 169 +++--------------- alluxiofs/client/worker_ring.py | 79 ++++---- alluxiofs/core.py | 56 ++---- .../derived/local/local_fallback_fixtures.py | 2 +- .../memory/memory_fallback_fixtures.py | 2 +- .../tests/derived/s3/s3_fallback_fixtures.py | 2 +- 8 files changed, 161 insertions(+), 246 deletions(-) create mode 100644 alluxiofs/client/config.py diff --git a/alluxiofs/client/config.py b/alluxiofs/client/config.py new file mode 100644 index 0000000..2cf982d --- /dev/null +++ b/alluxiofs/client/config.py @@ -0,0 +1,91 @@ +from typing import Optional + +import humanfriendly + +from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE +from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE +from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE +from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE + + +class AlluxioClientConfig: + """ + Class responsible for creating the configuration for Alluxio Client. + """ + + def __init__( + self, + etcd_hosts: Optional[str] = None, + worker_hosts: Optional[str] = None, + etcd_port=2379, + worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE, + etcd_refresh_workers_interval=120, + page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE, + hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, + cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE, + etcd_username: Optional[str] = None, + etcd_password: Optional[str] = None, + concurrency=64, + **kwargs, + ): + """ + Initializes Alluxio client configuration. + Args: + etcd_hosts (Optional[str], optional): The hostnames of ETCD to get worker addresses from + in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both. + worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format. + Either etcd_hosts or worker_hosts should be provided, not both. + concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64. + etcd_port (int, optional): The port of each etcd server. + worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node. + etcd_refresh_workers_interval (int, optional): The interval to refresh worker list from ETCD membership service periodically. + All negative values mean the service is disabled. + """ + if not (etcd_hosts or worker_hosts): + raise ValueError( + "Must supply either 'etcd_hosts' or 'worker_hosts'" + ) + if etcd_hosts and worker_hosts: + raise ValueError( + "Supply either 'etcd_hosts' or 'worker_hosts', not both" + ) + if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535): + raise ValueError( + "'etcd_port' should be an integer in the range 1-65535" + ) + if not isinstance(worker_http_port, int) or not ( + 1 <= worker_http_port <= 65535 + ): + raise ValueError( + "'worker_http_port' should be an integer in the range 1-65535" + ) + if not isinstance(concurrency, int) or concurrency <= 0: + raise ValueError("'concurrency' should be a positive integer") + if not isinstance(etcd_refresh_workers_interval, int): + raise ValueError( + "'etcd_refresh_workers_interval' should be an integer" + ) + self.etcd_hosts = etcd_hosts + self.worker_hosts = worker_hosts + self.etcd_port = etcd_port + self.worker_http_port = worker_http_port + self.etcd_refresh_workers_interval = etcd_refresh_workers_interval + if ( + not isinstance(hash_node_per_worker, int) + or hash_node_per_worker <= 0 + ): + raise ValueError( + "'hash_node_per_worker' should be a positive integer" + ) + + self.hash_node_per_worker = hash_node_per_worker + self.page_size = humanfriendly.parse_size(page_size, binary=True) + self.cluster_name = cluster_name + + if (etcd_username is None) != (etcd_password is None): + raise ValueError( + "Both ETCD username and password must be set or both should be unset." + ) + self.etcd_username = etcd_username + self.etcd_password = etcd_password + self.concurrency = concurrency diff --git a/alluxiofs/client/const.py b/alluxiofs/client/const.py index 3f68e67..6c55ca6 100644 --- a/alluxiofs/client/const.py +++ b/alluxiofs/client/const.py @@ -1,12 +1,6 @@ -ALLUXIO_CLUSTER_NAME_KEY = "alluxio.cluster.name" ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE = "DefaultAlluxioCluster" -ALLUXIO_ETCD_USERNAME_KEY = "alluxio.etcd.username" -ALLUXIO_ETCD_PASSWORD_KEY = "alluxio.etcd.password" ALLUXIO_PAGE_SIZE_KEY = "alluxio.worker.page.store.page.size" ALLUXIO_PAGE_SIZE_DEFAULT_VALUE = "1MB" -ALLUXIO_HASH_NODE_PER_WORKER_KEY = ( - "alluxio.user.consistent.hash.virtual.node.count.per.worker" -) ALLUXIO_WORKER_HTTP_SERVER_PORT_KEY = "alluxio.worker.http.server.port" ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE = 28080 ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE = 5 diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py index 902071a..e7d1425 100644 --- a/alluxiofs/client/core.py +++ b/alluxiofs/client/core.py @@ -15,12 +15,10 @@ import requests from requests.adapters import HTTPAdapter -from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE -from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY +from .config import AlluxioClientConfig from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE from .const import ALLUXIO_PAGE_SIZE_KEY from .const import ALLUXIO_SUCCESS_IDENTIFIER -from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE from .const import FULL_PAGE_URL_FORMAT from .const import GET_FILE_STATUS_URL_FORMAT from .const import LIST_URL_FORMAT @@ -74,139 +72,21 @@ class OpType(Enum): class AlluxioClient: """ - Access Alluxio file system - - Examples - -------- - >>> # Launch Alluxio with ETCD as service discovery - >>> alluxio = AlluxioClient(etcd_hosts="localhost") - >>> # Or launch Alluxio with user provided worker list - >>> alluxio = AlluxioClient(worker_hosts="host1,host2,host3") - - >>> print(alluxio.listdir("s3://mybucket/mypath/dir")) - [ - { - type: "file", - name: "my_file_name", - path: '/my_file_name', - ufs_path: 's3://example-bucket/my_file_name', - last_modification_time_ms: 0, - length: 77542, - human_readable_file_size: '75.72KB' - }, - - ] - >>> print(alluxio.read("s3://mybucket/mypath/dir/myfile")) - my_file_content + An AlluxioClient for interacting with Alluxio servers. """ def __init__( self, - etcd_hosts=None, - worker_hosts=None, - options=None, - logger=None, - concurrency=64, - etcd_port=2379, - worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE, - etcd_refresh_workers_interval=120, + **kwargs, ): """ - Inits Alluxio file system. - - Args: - etcd_hosts (str, optional): - The hostnames of ETCD to get worker addresses from - The hostnames in host1,host2,host3 format. Either etcd_hosts or worker_hosts should be provided, not both. - worker_hosts (str, optional): - The worker hostnames in host1,host2,host3 format. Either etcd_hosts or worker_hosts should be provided, not both. - options (dict, optional): - A dictionary of Alluxio property key and values. - Note that Alluxio Python API only support a limited set of Alluxio properties. - logger (Logger, optional): - A logger instance for logging messages. - concurrency (int, optional): - The maximum number of concurrent operations for HTTP requests. Default to 64. - etcd_port (int, optional): - The port of each etcd server. - worker_http_port (int, optional): - The port of the HTTP server on each Alluxio worker node. - etcd_refresh_workers_interval(int, optional): - The interval to refresh worker list from ETCD membership service periodically. All negative values mean the service is disabled. - + Inits Alluxio Client with Alluxio client arguments. + See AlluxioClientConfig for configurations. """ - # TODO(lu/chunxu) change to ETCD endpoints in format of 'http://etcd_host:port, http://etcd_host:port' & worker hosts in 'host:port, host:port' format - self.logger = logger or logging.getLogger("AlluxioPython") - if not (etcd_hosts or worker_hosts): - raise ValueError( - "Must supply either 'etcd_hosts' or 'worker_hosts'" - ) - if etcd_hosts and worker_hosts: - raise ValueError( - "Supply either 'etcd_hosts' or 'worker_hosts', not both" - ) - if not etcd_hosts: - self.logger.warning( - "'etcd_hosts' not supplied. An etcd cluster is required for dynamic cluster changes." - ) - if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535): - raise ValueError( - "'etcd_port' should be an integer in the range 1-65535" - ) - if not isinstance(worker_http_port, int) or not ( - 1 <= worker_http_port <= 65535 - ): - raise ValueError( - "'worker_http_port' should be an integer in the range 1-65535" - ) - if not isinstance(concurrency, int) or concurrency <= 0: - raise ValueError("'concurrency' should be a positive integer") - if concurrency < 10 or concurrency > 128: - self.logger.warning( - f"'concurrency' value of {concurrency} is outside the recommended range (10-128). " - "This may lead to suboptimal performance or resource utilization.", - ) - if not isinstance(etcd_refresh_workers_interval, int): - raise ValueError( - "'etcd_refresh_workers_interval' should be an integer" - ) - - self.session = self._create_session(concurrency) - - # parse options - page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE - hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE - if options: - if ALLUXIO_PAGE_SIZE_KEY in options: - page_size = options[ALLUXIO_PAGE_SIZE_KEY] - self.logger.debug(f"Page size is set to {page_size}") - if ALLUXIO_HASH_NODE_PER_WORKER_KEY in options: - hash_node_per_worker = int( - options[ALLUXIO_HASH_NODE_PER_WORKER_KEY] - ) - self.logger.debug( - f"Hash node per worker is set to {hash_node_per_worker}" - ) - if ( - not isinstance(hash_node_per_worker, int) - or hash_node_per_worker <= 0 - ): - raise ValueError( - "'hash_node_per_worker' should be a positive integer" - ) - - self.page_size = humanfriendly.parse_size(page_size, binary=True) - - self.hash_provider = ConsistentHashProvider( - etcd_hosts=etcd_hosts, - etcd_port=etcd_port, - worker_hosts=worker_hosts, - worker_http_port=worker_http_port, - hash_node_per_worker=hash_node_per_worker, - options=options, - logger=self.logger, - etcd_refresh_workers_interval=etcd_refresh_workers_interval, - ) + self.logger = kwargs.get("logger", logging.getLogger("Alluxiofs")) + self.config = AlluxioClientConfig(**kwargs) + self.session = self._create_session(self.config.concurrency) + self.hash_provider = ConsistentHashProvider(self.config, self.logger) def listdir(self, path): """ @@ -583,30 +463,30 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id): if not page_content: break yield page_content - if len(page_content) < self.page_size: # last page + if len(page_content) < self.config.page_size: # last page break page_index += 1 def _range_page_generator( self, worker_host, worker_http_port, path_id, offset, length ): - start_page_index = offset // self.page_size - start_page_offset = offset % self.page_size + start_page_index = offset // self.config.page_size + start_page_offset = offset % self.config.page_size - end_page_index = (offset + length - 1) // self.page_size - end_page_read_to = ((offset + length - 1) % self.page_size) + 1 + end_page_index = (offset + length - 1) // self.config.page_size + end_page_read_to = ((offset + length - 1) % self.config.page_size) + 1 page_index = start_page_index while True: try: read_offset = 0 - read_length = self.page_size + read_length = self.config.page_size if page_index == start_page_index: read_offset = start_page_offset if start_page_index == end_page_index: read_length = end_page_read_to - start_page_offset else: - read_length = self.page_size - start_page_offset + read_length = self.config.page_size - start_page_offset elif page_index == end_page_index: read_length = end_page_read_to @@ -867,14 +747,15 @@ def __init__( self.logger.debug(f"Page size is set to {page_size}") self.page_size = humanfriendly.parse_size(page_size, binary=True) self.hash_provider = ConsistentHashProvider( - etcd_hosts=etcd_hosts, - etcd_port=int(etcd_port), - worker_hosts=worker_hosts, - worker_http_port=int(http_port), - hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, - options=options, - logger=self.logger, - etcd_refresh_workers_interval=120, + AlluxioClientConfig( + etcd_hosts=etcd_hosts, + etcd_port=int(etcd_port), + worker_hosts=worker_hosts, + worker_http_port=int(http_port), + etcd_refresh_workers_interval=120, + page_size=page_size, + ), + self.logger, ) self.http_port = http_port self._loop = loop or asyncio.get_event_loop() diff --git a/alluxiofs/client/worker_ring.py b/alluxiofs/client/worker_ring.py index 7f4c816..f1e9ea4 100644 --- a/alluxiofs/client/worker_ring.py +++ b/alluxiofs/client/worker_ring.py @@ -7,16 +7,14 @@ from dataclasses import dataclass from typing import Dict from typing import List +from typing import Optional from typing import Set import etcd3 import mmh3 from sortedcontainers import SortedDict -from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE -from .const import ALLUXIO_CLUSTER_NAME_KEY -from .const import ALLUXIO_ETCD_PASSWORD_KEY -from .const import ALLUXIO_ETCD_USERNAME_KEY +from .config import AlluxioClientConfig from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE from .const import ETCD_PREFIX_FORMAT @@ -139,7 +137,9 @@ def from_host_and_port(worker_host, worker_http_port): class EtcdClient: - def __init__(self, host="localhost", port=2379, options=None): + def __init__( + self, config: AlluxioClientConfig, host="localhost", port=2379 + ): self._host = host self._port = port @@ -147,22 +147,11 @@ def __init__(self, host="localhost", port=2379, options=None): self._etcd_username = None self._etcd_password = None self._prefix = ETCD_PREFIX_FORMAT.format( - cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE + cluster_name=config.cluster_name ) - if options: - if ALLUXIO_ETCD_USERNAME_KEY in options: - self._etcd_username = options[ALLUXIO_ETCD_USERNAME_KEY] - if ALLUXIO_ETCD_PASSWORD_KEY in options: - self._etcd_password = options[ALLUXIO_ETCD_PASSWORD_KEY] - if ALLUXIO_CLUSTER_NAME_KEY in options: - self._prefix = ETCD_PREFIX_FORMAT.format( - cluster_name=options[ALLUXIO_CLUSTER_NAME_KEY] - ) - - if (self._etcd_username is None) != (self._etcd_password is None): - raise ValueError( - "Both ETCD username and password must be set or both should be unset." - ) + if config.etcd_username is not None: + self._etcd_username = config.etcd_username + self._etcd_password = config.etcd_password def get_worker_entities(self) -> Set[WorkerEntity]: """ @@ -173,7 +162,6 @@ def get_worker_entities(self) -> Set[WorkerEntity]: """ # Note that EtcdClient should not be passed through python multiprocessing etcd = self._get_etcd_client() - worker_entities: Set[WorkerEntity] = set() try: worker_entities = { WorkerEntity.from_worker_info(worker_info) @@ -205,37 +193,27 @@ def _get_etcd_client(self): class ConsistentHashProvider: def __init__( self, - etcd_hosts=None, - etcd_port=None, - worker_hosts=None, - worker_http_port=None, - options=None, - logger=None, - etcd_refresh_workers_interval=None, - hash_node_per_worker=None, - max_attempts=100, + config: AlluxioClientConfig, + logger: Optional[logging.Logger] = None, ): self._logger = logger or logging.getLogger("ConsistentHashProvider") - self._etcd_hosts = etcd_hosts - self._etcd_port = etcd_port - self._options = options - self._hash_node_per_worker = hash_node_per_worker - self._max_attempts = max_attempts + self._config = config self._lock = threading.Lock() self._is_ring_initialized = False self._worker_info_map = {} - self._etcd_refresh_workers_interval = etcd_refresh_workers_interval - if worker_hosts: + if self._config.worker_hosts is not None: self._update_hash_ring( - self._generate_worker_info_map(worker_hosts, worker_http_port) + self._generate_worker_info_map( + self._config.worker_hosts, self._config.worker_http_port + ) ) - if self._etcd_hosts: + if self._config.etcd_hosts is not None: self._fetch_workers_and_update_ring() - if self._etcd_refresh_workers_interval > 0: + if self._config.etcd_refresh_workers_interval > 0: self._shutdown_background_update_ring_event = threading.Event() self._background_thread = None self._start_background_update_ring( - self._etcd_refresh_workers_interval + self._config.etcd_refresh_workers_interval ) def get_multiple_workers( @@ -275,7 +253,7 @@ def _get_multiple_worker_identities( ) workers = [] attempts = 0 - while len(workers) < count and attempts < self._max_attempts: + while len(workers) < count and attempts < 100: attempts += 1 worker = self._get_ceiling_value(self._hash(key, attempts)) if worker not in workers: @@ -297,7 +275,10 @@ def update_loop(): self._background_thread.start() def shutdown_background_update_ring(self): - if self._etcd_hosts and self._etcd_refresh_workers_interval > 0: + if ( + self._config.etcd_hosts is not None + and self._config.etcd_refresh_workers_interval > 0 + ): self._shutdown_background_update_ring_event.set() if self._background_thread: self._background_thread.join() @@ -306,13 +287,15 @@ def __del__(self): self.shutdown_background_update_ring() def _fetch_workers_and_update_ring(self): - etcd_hosts_list = self._etcd_hosts.split(",") + etcd_hosts_list = self._config.etcd_hosts.split(",") random.shuffle(etcd_hosts_list) worker_entities: Set[WorkerEntity] = set() for host in etcd_hosts_list: try: worker_entities = EtcdClient( - host=host, port=self._etcd_port, options=self._options + host=host, + port=self._config.etcd_port, + config=self._config, ).get_worker_entities() break except Exception: @@ -320,12 +303,12 @@ def _fetch_workers_and_update_ring(self): if not worker_entities: if self._is_ring_initialized: self._logger.info( - f"Failed to achieve worker info list from ETCD servers:{self._etcd_hosts}" + f"Failed to achieve worker info list from ETCD servers:{self._config.etcd_hosts}" ) return else: raise Exception( - f"Failed to achieve worker info list from ETCD servers:{self._etcd_hosts}" + f"Failed to achieve worker info list from ETCD servers:{self._config.etcd_hosts}" ) worker_info_map = {} @@ -354,7 +337,7 @@ def _update_hash_ring( with self._lock: hash_ring = SortedDict() for worker_identity in worker_info_map.keys(): - for i in range(self._hash_node_per_worker): + for i in range(self._config.hash_node_per_worker): hash_key = self._hash_worker_identity(worker_identity, i) hash_ring[hash_key] = worker_identity self.hash_ring = hash_ring diff --git a/alluxiofs/core.py b/alluxiofs/core.py index 7a9f7a8..e3f5319 100644 --- a/alluxiofs/core.py +++ b/alluxiofs/core.py @@ -37,52 +37,26 @@ class AlluxioFileSystem(AbstractFileSystem): def __init__( self, - etcd_hosts=None, - worker_hosts=None, - options=None, - logger=None, - concurrency=64, - etcd_port=2379, - worker_http_port=28080, - preload_path=None, target_protocol=None, target_options=None, fs=None, - test_options=None, + alluxio_client=None, **kwargs, ): """ Initializes an Alluxio filesystem on top of underlying filesystem to leveraging the data caching and management features of Alluxio. - The Alluxio args: - etcd_hosts (str, optional): A comma-separated list of ETCD server hosts in the format "host1:port1,host2:port2,...". - ETCD is used for dynamic discovery of Alluxio workers. - Either `etcd_hosts` or `worker_hosts` must be specified, not both. - worker_hosts (str, optional): A comma-separated list of Alluxio worker hosts in the format "host1:port1,host2:port2,...". - Directly specifies workers without using ETCD. - Either `etcd_hosts` or `worker_hosts` must be specified, not both. - options (dict, optional): A dictionary of Alluxio configuration options where keys are property names and values are property values. - These options configure the Alluxio client behavior. - logger (logging.Logger, optional): A logger instance for logging messages. - If not provided, a default logger with the name "AlluxioFileSystem" is used. - concurrency (int, optional): The maximum number of concurrent operations (e.g., reads, writes) that the file system interface will allow. Defaults to 64. - etcd_port (int, optional): The port number used by each etcd server. - Relevant only if `etcd_hosts` is specified. - worker_http_port (int, optional): The port number used by the HTTP server on each Alluxio worker. - This is used for accessing Alluxio's HTTP-based APIs. - preload_path (str, optional): Specifies a path to preload into the Alluxio file system cache at initialization. - This can be useful for ensuring that certain critical data is immediately available in the cache. The underlying filesystem args target_protocol (str, optional): Specifies the under storage protocol to create the under storage file system object. Common examples include 's3' for Amazon S3, 'hdfs' for Hadoop Distributed File System, and others. target_options (dict, optional): Provides a set of configuration options relevant to the `target_protocol`. These options might include credentials, endpoint URLs, and other protocol-specific settings required to successfully interact with the under storage system. fs (object, optional): Directly supplies an instance of a file system object for accessing the underlying storage of Alluxio - Other args: - test_options (dict, optional): A dictionary of options used exclusively for testing purposes. - These might include mock interfaces or specific configuration overrides for test scenarios. - **kwargs: other parameters for core session. + The Alluxio client args: + alluxio_client (AlluxioClient, Optional): the alluxio client to connects to Alluxio servers. + If not provided, please add Alluxio client arguments to init a new Alluxio Client. + **kwargs: other parameters for initializing Alluxio client or fsspec. """ super().__init__(**kwargs) if not (fs is None) ^ (target_protocol is None): @@ -94,7 +68,7 @@ def __init__( raise ValueError( "Please provide filesystem instance(fs) or target_protocol" ) - self.logger = logger or logging.getLogger("Alluxiofs") + self.logger = kwargs.get("logger", logging.getLogger("Alluxiofs")) self.kwargs = target_options or {} self.fs = None if fs is not None: @@ -102,21 +76,13 @@ def __init__( elif target_protocol is not None: self.fs = filesystem(target_protocol, **self.kwargs) - test_options = test_options or {} - if test_options.get("skip_alluxio") is True: + skip_alluxio = kwargs.get("skip_alluxio", False) + if skip_alluxio: self.alluxio = None + elif alluxio_client: + self.alluxio = alluxio_client else: - self.alluxio = AlluxioClient( - etcd_hosts=etcd_hosts, - worker_hosts=worker_hosts, - options=options, - logger=logger, - concurrency=concurrency, - etcd_port=etcd_port, - worker_http_port=worker_http_port, - ) - if preload_path is not None: - self.alluxio.load(preload_path) + self.alluxio = AlluxioClient(**kwargs) def _strip_protocol(path): if self.fs: diff --git a/alluxiofs/tests/derived/local/local_fallback_fixtures.py b/alluxiofs/tests/derived/local/local_fallback_fixtures.py index cf1417f..89dcc37 100644 --- a/alluxiofs/tests/derived/local/local_fallback_fixtures.py +++ b/alluxiofs/tests/derived/local/local_fallback_fixtures.py @@ -12,7 +12,7 @@ def fs(self): etcd_hosts="localhost", target_protocol="file", target_options={"auto_mkdir": True}, - test_options={"skip_alluxio": True}, + skip_alluxio=True, ) @pytest.fixture diff --git a/alluxiofs/tests/derived/memory/memory_fallback_fixtures.py b/alluxiofs/tests/derived/memory/memory_fallback_fixtures.py index 2af5011..2ed5903 100644 --- a/alluxiofs/tests/derived/memory/memory_fallback_fixtures.py +++ b/alluxiofs/tests/derived/memory/memory_fallback_fixtures.py @@ -16,7 +16,7 @@ def fs(self): yield AlluxioFileSystem( etcd_hosts="localhost", fs=m, - test_options={"skip_alluxio": True}, + skip_alluxio=True, ) finally: m.store.clear() diff --git a/alluxiofs/tests/derived/s3/s3_fallback_fixtures.py b/alluxiofs/tests/derived/s3/s3_fallback_fixtures.py index 07d08e8..7ceadad 100644 --- a/alluxiofs/tests/derived/s3/s3_fallback_fixtures.py +++ b/alluxiofs/tests/derived/s3/s3_fallback_fixtures.py @@ -10,7 +10,7 @@ def fs(self): return AlluxioFileSystem( etcd_hosts="localhost", target_protocol="s3", - test_options={"skip_alluxio": True}, + skip_alluxio=True, ) @pytest.fixture From 0022f6b6fb38e87f86c2e6df145d185b84954ee3 Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Mon, 11 Mar 2024 14:19:18 -0700 Subject: [PATCH 2/2] Fix test worker hash ring --- tests/client/test_worker_hash_ring.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/client/test_worker_hash_ring.py b/tests/client/test_worker_hash_ring.py index a112917..68deb06 100644 --- a/tests/client/test_worker_hash_ring.py +++ b/tests/client/test_worker_hash_ring.py @@ -1,6 +1,7 @@ import json import os +from alluxiofs.client.config import AlluxioClientConfig from alluxiofs.client.worker_ring import ConsistentHashProvider from alluxiofs.client.worker_ring import WorkerIdentity from alluxiofs.client.worker_ring import WorkerNetAddress @@ -14,9 +15,10 @@ def test_hash_ring(): worker_hostnames = json.load(file) hash_provider = ConsistentHashProvider( - worker_hosts=", ".join(worker_hostnames), - hash_node_per_worker=5, - etcd_refresh_workers_interval=100000000, + AlluxioClientConfig( + worker_hosts=", ".join(worker_hostnames), + etcd_refresh_workers_interval=100000000, + ) ) hash_ring_path = os.path.join(hash_res_dir, "activeNodesMap.json")