diff --git a/sdks/python/src/opik/__init__.py b/sdks/python/src/opik/__init__.py index 6bf87f4256..c1db1f926c 100644 --- a/sdks/python/src/opik/__init__.py +++ b/sdks/python/src/opik/__init__.py @@ -5,7 +5,7 @@ from .api_objects.dataset.dataset_item import DatasetItem from .api_objects.dataset import Dataset from . import _logging -from .opik_configure import configure +from .configurator.configure import configure from . import package_version from .plugins.pytest.decorator import llm_unit from .evaluation import evaluate diff --git a/sdks/python/src/opik/cli.py b/sdks/python/src/opik/cli.py index 95b20cbe66..6340b6f6b4 100644 --- a/sdks/python/src/opik/cli.py +++ b/sdks/python/src/opik/cli.py @@ -5,7 +5,7 @@ import click import questionary -from . import opik_configure +from opik.configurator import configure as opik_configure __version__: str = "0.0.0+dev" if __package__: diff --git a/sdks/python/src/opik/configurator/__init__.py b/sdks/python/src/opik/configurator/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/src/opik/configurator/configure.py b/sdks/python/src/opik/configurator/configure.py new file mode 100644 index 0000000000..b5ad7a6d81 --- /dev/null +++ b/sdks/python/src/opik/configurator/configure.py @@ -0,0 +1,486 @@ +import getpass +import logging +from typing import Final, List, Optional + +import httpx +import opik.config +from opik.config import ( + OPIK_BASE_URL_CLOUD, + OPIK_BASE_URL_LOCAL, + OPIK_WORKSPACE_DEFAULT_NAME, +) +from opik.configurator.interactive_helpers import ask_user_for_approval, is_interactive +from opik.exceptions import ConfigurationError + +LOGGER = logging.getLogger(__name__) + +HEALTH_CHECK_URL_POSTFIX: Final[str] = "/is-alive/ping" +HEALTH_CHECK_TIMEOUT: Final[float] = 1.0 + +URL_ACCOUNT_DETAILS_DEFAULT: Final[str] = ( + "https://www.comet.com/api/rest/v2/account-details" +) +URL_ACCOUNT_DETAILS_POSTFIX: Final[str] = "/rest/v2/account-details" + +URL_WORKSPACE_GET_LIST_DEFAULT: Final[str] = ( + "https://www.comet.com/api/rest/v2/workspaces" +) +URL_WORKSPACE_GET_LIST_POSTFIX: Final[str] = "/rest/v2/workspaces" + + +class OpikConfigurator: + def __init__( + self, + api_key: Optional[str] = None, + workspace: Optional[str] = None, + url: Optional[str] = None, + use_local: bool = False, + force: bool = False, + ): + self.api_key = api_key + self.workspace = workspace + self.url = url + self.use_local = use_local + self.force = force + self.current_config = opik.config.OpikConfig() + + def configure(self) -> None: + """ + Create a local configuration file for the Python SDK. If a configuration file already exists, + it will not be overwritten unless the `force` parameter is set to True. + + Raises: + ConfigurationError + ConnectionError + """ + + # OPIK CLOUD + if self.use_local is False: + self._configure_cloud() + return + + # LOCAL OPIK DEPLOYMENT + self._configure_local() + return + + def _configure_cloud(self) -> None: + """ + Configure the cloud Opik instance by handling API key and workspace settings. + """ + # Handle URL + if self.url is None: + self.url = OPIK_BASE_URL_CLOUD + + # Handle API key: get or prompt for one if needed + update_config_with_api_key = self._set_api_key() + + # Handle workspace: get or prompt for one if needed + update_config_with_workspace = self._set_workspace() + + # Update configuration if either API key or workspace has changed + if update_config_with_api_key or update_config_with_workspace: + self._update_config() + else: + self._update_config(save_to_file=False) + LOGGER.info( + "Opik is already configured. You can check the settings by viewing the config file at %s", + self.current_config.config_file_fullpath, + ) + + def _configure_local(self) -> None: + """ + Configure the local Opik instance by setting the local URL and workspace. + + Raises: + ConfigurationError: Raised if the Opik instance is not active or not found. + """ + self.api_key = None + self.workspace = OPIK_WORKSPACE_DEFAULT_NAME + + # Step 1: If the URL is provided and active, update the configuration + if self.url is not None and self._is_instance_active(self.url): + self._update_config(save_to_file=self.force) + return + + # Step 2: Check if the default local instance is active + if self._is_instance_active(OPIK_BASE_URL_LOCAL): + if ( + not self.force + and self.current_config.url_override == OPIK_BASE_URL_LOCAL + ): + LOGGER.info( + f"Opik is already configured to local instance at {OPIK_BASE_URL_LOCAL}." + ) + return + + # Step 3: Ask user if they want to use the found local instance + if not is_interactive(): + raise ConfigurationError( + "Non-interactive mode detected. Unable to proceed." + ) + + use_url = ask_user_for_approval( + f"Found local Opik instance on: {OPIK_BASE_URL_LOCAL}, do you want to use it? (Y/n)" + ) + if use_url: + self.url = OPIK_BASE_URL_LOCAL + self._update_config() + return + + # Step 4: Ask user for URL if no valid local instance is found or approved + if not is_interactive(): + raise ConfigurationError( + "Non-interactive mode detected. Unable to proceed as no local Opik instance was found." + ) + self._ask_for_url() + self._update_config() + + def _set_api_key(self) -> bool: + """ + Determines and set the correct API key based on the current configuration, force flag, and user input. + + Returns: + bool: a boolean indicating if the configuration file needs updating. + """ + config_file_needs_updating = False + + if self.api_key: + if not self._is_api_key_correct(self.api_key): + raise ConfigurationError("API key is incorrect.") + config_file_needs_updating = True if self.force else False + + elif self.force and self.api_key is None: + self._ask_for_api_key() + config_file_needs_updating = True + + elif self.api_key is None and self.current_config.api_key is None: + self._ask_for_api_key() + config_file_needs_updating = True + + elif self.api_key is None and self.current_config.api_key is not None: + self.api_key = self.current_config.api_key + + return config_file_needs_updating + + def _ask_for_api_key(self) -> None: + """ + Prompt the user for an Opik cloud API key and verify its validity. + The function retries up to 3 times if the API key is invalid. + + Raises: + ConfigurationError: Raised if the API key provided by the user is invalid after 3 attempts. + """ + retries = 3 + + LOGGER.info( + "Your Opik cloud API key is available at https://www.comet.com/api/my/settings/." + ) + + if not is_interactive(): + raise ConfigurationError( + "Non-interactive mode detected. Unable to proceed as no API key has been specified." + ) + + while retries > 0: + user_input_api_key = getpass.getpass( + "Please enter your Opik Cloud API key:" + ) + + if self._is_api_key_correct(user_input_api_key): + self.api_key = user_input_api_key + return + else: + LOGGER.error( + f"The API key provided is not valid on {OPIK_BASE_URL_CLOUD}. Please try again." + ) + retries -= 1 + raise ConfigurationError("API key is incorrect.") + + def _is_api_key_correct(self, api_key: str) -> bool: + """ + Validates if the provided Opik API key is correct by sending a request to the cloud API. + + Returns: + bool: True if the API key is valid (status 200), False if the key is invalid (status 401 or 403). + + Raises: + ConnectionError: If a network-related error occurs or the response status is neither 200, 401, nor 403. + """ + + try: + with httpx.Client() as client: + client.headers.update({"Authorization": f"{api_key}"}) + response = client.get(url=self._url_account_details) + if response.status_code == 200: + return True + elif response.status_code in [401, 403]: + return False + else: + raise ConnectionError(f"Error while checking API key: {response.text}") + except httpx.RequestError as e: + raise ConnectionError(f"Network error occurred: {str(e)}") + except Exception as e: + raise ConnectionError(f"Unexpected error occurred: {str(e)}") + + def _set_workspace(self) -> bool: + """ + Determines and set the correct workspace based on current configuration, force flag, and user input. + + Returns: + bool: a boolean indicating whether the configuration file needs updating. + + Raises: + ConfigurationError: If the provided workspace is invalid. + """ + + # Case 1: Workspace was provided by the user and is valid + if self.workspace is not None: + if not self._is_workspace_name_correct(self.workspace): + raise ConfigurationError( + f"Workspace `{self.workspace}` is incorrect for the given API key." + ) + return True if self.force else False + + # Case 2: Use workspace from current configuration if not forced to change + if ( + "workspace" in self.current_config.model_fields_set + and self.current_config.workspace != OPIK_WORKSPACE_DEFAULT_NAME + and not self.force + ): + self.workspace = self.current_config.workspace + return False + + # Case 3: No workspace provided, prompt the user + default_workspace = self._get_default_workspace() + use_default_workspace = ask_user_for_approval( + f'Do you want to use "{default_workspace}" workspace? (Y/n)' + ) + + if use_default_workspace: + self.workspace = default_workspace + else: + self._ask_for_workspace() + + return True + + def _is_workspace_name_correct(self, workspace: str) -> bool: + """ + Verifies whether the provided workspace name exists in the user's cloud Opik account. + + Args: + workspace (str): The name of the workspace to check. + + Returns: + bool: True if the workspace is found, False otherwise. + + Raises: + ConnectionError: Raised if there's an issue with connecting to the Opik service, or the response is not successful. + """ + if not self.api_key: + raise ConfigurationError("API key must be set to check workspace name.") + + try: + with httpx.Client() as client: + client.headers.update({"Authorization": f"{self.api_key}"}) + response = client.get(url=self._url_get_workspace_list) + except httpx.RequestError as e: + # Raised for network-related errors such as timeouts + raise ConnectionError(f"Network error: {str(e)}") + except Exception as e: + raise ConnectionError(f"Unexpected error occurred: {str(e)}") + + if response.status_code != 200: + raise ConnectionError( + f"HTTP error: {response.status_code} - {response.text}" + ) + + workspaces: List[str] = response.json().get("workspaceNames", []) + return workspace in workspaces + + def _get_default_workspace(self) -> str: + """ + Retrieves the default Opik workspace name associated with the given API key. + + Returns: + str: The default workspace name. + + Raises: + ConnectionError: If there's an error while fetching the default workspace. + """ + if not self.api_key: + raise ConfigurationError("API key must be set.") + + try: + with httpx.Client() as client: + client.headers.update({"Authorization": f"{self.api_key}"}) + response = client.get(url=self._url_account_details) + + if response.status_code != 200: + raise ConnectionError( + f"Error while getting default workspace name: {response.text}" + ) + + default_workspace_name = response.json().get("defaultWorkspaceName") + if not default_workspace_name: + raise ConnectionError("defaultWorkspaceName not found in the response.") + + return default_workspace_name + + except httpx.RequestError as e: + raise ConnectionError(f"Network error occurred: {str(e)}") + except Exception as e: + raise ConnectionError(f"Unexpected error occurred: {str(e)}") + + def _ask_for_workspace(self) -> None: + """ + Prompt the user for an Opik instance workspace name and verify its validity. + The function retries up to 3 times if the workspace name is invalid. + + Raises: + ConfigurationError: Raised if the workspace name is invalid after 3 attempts. + """ + retries = 3 + + if not self.api_key: + raise ConfigurationError("API key must be set to check workspace name.") + + if not is_interactive(): + raise ConfigurationError( + "Non-interactive mode detected. Unable to proceed as no workspace name has been specified." + ) + + while retries > 0: + user_input_workspace = input( + "Please enter your cloud Opik instance workspace name: " + ) + if self._is_workspace_name_correct(user_input_workspace): + self.workspace = user_input_workspace + return + else: + LOGGER.error( + "This workspace does not exist, please enter a workspace that you have access to." + ) + retries -= 1 + raise ConfigurationError( + "User does not have access to the workspaces provided." + ) + + def _update_config(self, save_to_file: bool = True) -> None: + """ + Save changes to the config file and update the current session configuration. + + Raises: + ConfigurationError: Raised if there is an issue saving the configuration or updating the session. + """ + try: + if save_to_file: + new_config = opik.config.OpikConfig( + api_key=self.api_key, + url_override=self.url, + workspace=self.workspace, + ) + new_config.save_to_file() + + # Update current session configuration + opik.config.update_session_config("api_key", self.api_key) + opik.config.update_session_config("url_override", self.url) + opik.config.update_session_config("workspace", self.workspace) + except Exception as e: + LOGGER.error(f"Failed to update config: {str(e)}") + raise ConfigurationError("Failed to update configuration.") + + @staticmethod + def _is_instance_active(url: str) -> bool: + """ + Returns True if the given Opik URL responds to an HTTP GET request. + + Args: + url (str): The base URL of the instance to check. + + Returns: + bool: True if the instance responds with HTTP status 200, otherwise False. + """ + try: + with httpx.Client(timeout=HEALTH_CHECK_TIMEOUT) as http_client: + response = http_client.get(url=url + HEALTH_CHECK_URL_POSTFIX) + return response.status_code == 200 + except httpx.ConnectTimeout: + return False + except Exception: + return False + + def _ask_for_url(self) -> None: + """ + Prompt the user for an Opik instance URL and check if it is accessible. + The function retries up to 3 times if the URL is not accessible. + + Raises: + ConfigurationError: Raised if the URL provided by the user is not accessible after 3 attempts. + """ + retries = 3 + while retries > 0: + user_input_opik_url = input("Please enter your Opik instance URL:") + if self._is_instance_active(user_input_opik_url): + self.url = user_input_opik_url + return + else: + LOGGER.error( + f"Opik is not accessible at {user_input_opik_url}. " + f"Please try again, the URL should follow a format similar to {OPIK_BASE_URL_LOCAL}" + ) + retries -= 1 + raise ConfigurationError( + "Cannot use the URL provided by the user. Opik instance is not active or not found." + ) + + @property + def _url_account_details(self) -> str: + if self.url is None: + return URL_ACCOUNT_DETAILS_DEFAULT + + if self.url == OPIK_BASE_URL_CLOUD: + return URL_ACCOUNT_DETAILS_DEFAULT + + return f"{self.url}{URL_ACCOUNT_DETAILS_POSTFIX}" + + @property + def _url_get_workspace_list(self) -> str: + if self.url is None: + return URL_WORKSPACE_GET_LIST_DEFAULT + + if self.url == OPIK_BASE_URL_CLOUD: + return URL_WORKSPACE_GET_LIST_DEFAULT + + return f"{self.url}{URL_WORKSPACE_GET_LIST_POSTFIX}" + + +def configure( + api_key: Optional[str] = None, + workspace: Optional[str] = None, + url: Optional[str] = None, + use_local: bool = False, + force: bool = False, +) -> None: + """ + Create a local configuration file for the Python SDK. If a configuration file already exists, + it will not be overwritten unless the `force` parameter is set to True. + + Args: + api_key: The API key if using an Opik Cloud. + workspace: The workspace name if using an Opik Cloud. + url: The URL of the Opik instance if you are using a local deployment. + use_local: Whether to use a local deployment. + force: If true, the configuration file will be recreated and existing settings + will be overwritten with passed parameters. + + Raises: + ConfigurationError + """ + client = OpikConfigurator( + api_key=api_key, + workspace=workspace, + url=url, + use_local=use_local, + force=force, + ) + client.configure() diff --git a/sdks/python/src/opik/configurator/interactive_helpers.py b/sdks/python/src/opik/configurator/interactive_helpers.py new file mode 100644 index 0000000000..6b7d5409e5 --- /dev/null +++ b/sdks/python/src/opik/configurator/interactive_helpers.py @@ -0,0 +1,97 @@ +import logging +import sys + +LOGGER = logging.getLogger(__name__) + + +def is_interactive() -> bool: + """ + Determines if the current environment is interactive. + + Returns: + bool: True if the environment is either running in a terminal, + a Jupyter notebook, an IPython environment, or Google Colab. + False otherwise. + """ + return ( + sys.stdin.isatty() + or _in_jupyter_environment() + or _in_ipython_environment() + or _in_colab_environment() + ) + + +def _in_jupyter_environment() -> bool: + """ + Determine if the current environment is a Jupyter notebook. + + Returns: + bool: True if running in a Jupyter notebook environment, otherwise False. + """ + try: + import IPython + except Exception: + return False + + ipy = IPython.get_ipython() + if ipy is None or not hasattr(ipy, "kernel"): + return False + else: + return True + + +def _in_ipython_environment() -> bool: + """ + Determines if the current environment is an IPython environment. + + Returns: + bool: True if the code is running in an IPython environment, False otherwise. + """ + try: + import IPython + except Exception: + return False + + ipy = IPython.get_ipython() + if ipy is None: + return False + else: + return True + + +def _in_colab_environment() -> bool: + """ + Determines if the code is running within a Google Colab environment. + + Returns: + bool: True if running in Google Colab, False otherwise. + """ + try: + import IPython + except Exception: + return False + + ipy = IPython.get_ipython() + return "google.colab" in str(ipy) + + +def ask_user_for_approval(message: str) -> bool: + """ + Prompt the user with a message for approval (Y/Yes/N/No). + + Args: + message (str): The message to display to the user. + + Returns: + bool: True if the user approves (Y/Yes/empty input), False if the user disapproves (N/No). + + Logs: + Error when the user input is not recognized. + """ + while True: + users_choice = input(message).strip().upper() + if users_choice in ("Y", "YES", ""): + return True + if users_choice in ("N", "NO"): + return False + LOGGER.error("Wrong choice. Please try again.") diff --git a/sdks/python/src/opik/opik_configure.py b/sdks/python/src/opik/opik_configure.py deleted file mode 100644 index e2bc6016c0..0000000000 --- a/sdks/python/src/opik/opik_configure.py +++ /dev/null @@ -1,539 +0,0 @@ -import getpass -import logging -from typing import Final, List, Optional, Tuple, cast - -import httpx - -import opik.config -from opik.config import ( - OPIK_BASE_URL_CLOUD, - OPIK_BASE_URL_LOCAL, - OPIK_WORKSPACE_DEFAULT_NAME, -) -from opik.exceptions import ConfigurationError - -LOGGER = logging.getLogger(__name__) - -HEALTH_CHECK_URL_POSTFIX: Final[str] = "/is-alive/ping" -HEALTH_CHECK_TIMEOUT: Final[float] = 1.0 - -URL_ACCOUNT_DETAILS: Final[str] = "https://www.comet.com/api/rest/v2/account-details" -URL_WORKSPACE_GET_LIST: Final[str] = "https://www.comet.com/api/rest/v2/workspaces" - - -def is_interactive() -> bool: - """ - Returns True if in interactive mode - """ - # return bool(getattr(sys, "ps1", sys.flags.interactive)) - return True - - -def is_instance_active(url: str) -> bool: - """ - Returns True if the given Opik URL responds to an HTTP GET request. - - Args: - url (str): The base URL of the instance to check. - - Returns: - bool: True if the instance responds with HTTP status 200, otherwise False. - """ - try: - with httpx.Client(timeout=HEALTH_CHECK_TIMEOUT) as http_client: - response = http_client.get(url=url + HEALTH_CHECK_URL_POSTFIX) - return response.status_code == 200 - except httpx.ConnectTimeout: - return False - except Exception: - return False - - -def is_workspace_name_correct(api_key: str, workspace: str) -> bool: - """ - Verifies whether the provided workspace name exists in the user's cloud Opik account. - - Args: - api_key (str): The API key used for authentication with the Opik service. - workspace (str): The name of the workspace to check. - - Returns: - bool: True if the workspace is found, False otherwise. - - Raises: - ConnectionError: Raised if there's an issue with connecting to the Opik service, or the response is not successful. - """ - - try: - with httpx.Client() as client: - client.headers.update({"Authorization": f"{api_key}"}) - response = client.get(url=URL_WORKSPACE_GET_LIST) - except httpx.RequestError as e: - # Raised for network-related errors such as timeouts - raise ConnectionError(f"Network error: {str(e)}") - except Exception as e: - raise ConnectionError(f"Unexpected error occurred: {str(e)}") - - if response.status_code != 200: - raise ConnectionError(f"HTTP error: {response.status_code} - {response.text}") - - workspaces: List[str] = response.json().get("workspaceNames", []) - - return workspace in workspaces - - -def is_api_key_correct(api_key: str) -> bool: - """ - Validates if the provided Opik API key is correct by sending a request to the cloud API. - - Args: - api_key (str): The API key used for authentication. - - Returns: - bool: True if the API key is valid (status 200), False if the key is invalid (status 401 or 403). - - Raises: - ConnectionError: If a network-related error occurs or the response status is neither 200, 401, nor 403. - """ - try: - with httpx.Client() as client: - client.headers.update({"Authorization": f"{api_key}"}) - response = client.get(url=URL_ACCOUNT_DETAILS) - - if response.status_code == 200: - return True - elif response.status_code in [401, 403]: - return False - else: - raise ConnectionError(f"Error while checking API key: {response.text}") - - except httpx.RequestError as e: - raise ConnectionError(f"Network error occurred: {str(e)}") - except Exception as e: - raise ConnectionError(f"Unexpected error occurred: {str(e)}") - - -def get_default_workspace(api_key: str) -> str: - """ - Retrieves the default Opik workspace name associated with the given API key. - - Args: - api_key (str): The API key used for authentication. - - Returns: - str: The default workspace name. - - Raises: - ConnectionError: If there's an error while fetching the default workspace. - """ - try: - with httpx.Client() as client: - client.headers.update({"Authorization": f"{api_key}"}) - response = client.get(url=URL_ACCOUNT_DETAILS) - - if response.status_code != 200: - raise ConnectionError( - f"Error while getting default workspace name: {response.text}" - ) - - default_workspace_name = response.json().get("defaultWorkspaceName") - if not default_workspace_name: - raise ConnectionError("defaultWorkspaceName not found in the response.") - - return default_workspace_name - - except httpx.RequestError as e: - raise ConnectionError(f"Network error occurred: {str(e)}") - except Exception as e: - raise ConnectionError(f"Unexpected error occurred: {str(e)}") - - -def _update_config( - api_key: Optional[str], - url: str, - workspace: str, -) -> None: - """ - Save changes to the config file and update the current session configuration. - - Args: - api_key (Optional[str]): The API key for the Opik Cloud service. Can be None if not using Opik Cloud. - url (str): The base URL of the Opik instance (local or cloud). - workspace (str): The name of the workspace to be saved. - - Raises: - ConfigurationError: Raised if there is an issue saving the configuration or updating the session. - """ - try: - new_config = opik.config.OpikConfig( - api_key=api_key, - url_override=url, - workspace=workspace, - ) - new_config.save_to_file() - - # Update current session configuration - opik.config.update_session_config("api_key", api_key) - opik.config.update_session_config("url_override", url) - opik.config.update_session_config("workspace", workspace) - - except Exception as e: - LOGGER.error(f"Failed to update config: {str(e)}") - raise ConfigurationError("Failed to update configuration.") - - -def _ask_for_url() -> str: - """ - Prompt the user for an Opik instance URL and check if it is accessible. - The function retries up to 3 times if the URL is not accessible. - - Returns: - str: A valid Opik instance URL. - - Raises: - ConfigurationError: Raised if the URL provided by the user is not accessible after 3 attempts. - """ - retries = 3 - - while retries > 0: - user_input_opik_url = input("Please enter your Opik instance URL:") - - if is_instance_active(user_input_opik_url): - return user_input_opik_url - else: - LOGGER.error( - f"Opik is not accessible at {user_input_opik_url}. Please try again, the URL should follow a format similar to {OPIK_BASE_URL_LOCAL}" - ) - retries -= 1 - - raise ConfigurationError( - "Cannot use the URL provided by the user. Opik instance is not active or not found." - ) - - -def _ask_for_api_key() -> str: - """ - Prompt the user for an Opik cloud API key and verify its validity. - The function retries up to 3 times if the API key is invalid. - - Returns: - str: A valid Opik API key. - - Raises: - ConfigurationError: Raised if the API key provided by the user is invalid after 3 attempts. - """ - retries = 3 - LOGGER.info( - "Your Opik cloud API key is available at https://www.comet.com/api/my/settings/." - ) - - while retries > 0: - user_input_api_key = getpass.getpass("Please enter your Opik Cloud API key:") - - if is_api_key_correct(user_input_api_key): - return user_input_api_key - else: - LOGGER.error( - f"The API key provided is not valid on {OPIK_BASE_URL_CLOUD}. Please try again." - ) - retries -= 1 - - raise ConfigurationError("API key is incorrect.") - - -def _ask_for_workspace(api_key: str) -> str: - """ - Prompt the user for an Opik instance workspace name and verify its validity. - - The function retries up to 3 times if the workspace name is invalid. - - Args: - api_key (str): The API key used to verify the workspace name. - - Returns: - str: A valid workspace name. - - Raises: - ConfigurationError: Raised if the workspace name is invalid after 3 attempts. - """ - retries = 3 - - while retries > 0: - user_input_workspace = input( - "Please enter your cloud Opik instance workspace name: " - ) - - if is_workspace_name_correct(api_key, user_input_workspace): - return user_input_workspace - else: - LOGGER.error( - "This workspace does not exist, please enter a workspace that you have access to." - ) - retries -= 1 - - raise ConfigurationError("User does not have access to the workspaces provided.") - - -def ask_user_for_approval(message: str) -> bool: - """ - Prompt the user with a message for approval (Y/Yes/N/No). - - Args: - message (str): The message to display to the user. - - Returns: - bool: True if the user approves (Y/Yes/empty input), False if the user disapproves (N/No). - - Logs: - Error when the user input is not recognized. - """ - while True: - users_choice = input(message).strip().upper() - - if users_choice in ("Y", "YES", ""): - return True - - if users_choice in ("N", "NO"): - return False - - LOGGER.error("Wrong choice. Please try again.") - - -def _get_api_key( - api_key: Optional[str], - current_config: opik.config.OpikConfig, - force: bool, -) -> Tuple[str, bool]: - """ - Determines the correct API key based on the current configuration, force flag, and user input. - - Args: - api_key (Optional[str]): The user-provided API key. - current_config (OpikConfig): The current configuration object. - force (bool): Whether to force reconfiguration. - - Returns: - Tuple[str, bool]: A tuple containing the validated API key and a boolean indicating - if the configuration file needs updating. - """ - config_file_needs_updating = False - - if force and api_key is None: - api_key = _ask_for_api_key() - config_file_needs_updating = True - elif api_key is None and current_config.api_key is None: - api_key = _ask_for_api_key() - config_file_needs_updating = True - elif api_key is None and current_config.api_key is not None: - api_key = current_config.api_key - # fixme if force is True -> need to save anyway? - - # todo add force and api_key is NOT None -> need to save? - # todo force is False, api_key is not None -> need to save? - - # fixme is this check for mypy? - # Ensure the API key is not None - api_key = cast(str, api_key) - - return api_key, config_file_needs_updating - - -def _get_workspace( - workspace: Optional[str], - api_key: str, - current_config: opik.config.OpikConfig, - force: bool, -) -> Tuple[str, bool]: - """ - Determines the correct workspace based on current configuration, force flag, and user input. - - Args: - workspace (Optional[str]): The user-provided workspace name. - api_key (str): The validated API key. - current_config (OpikConfig): The current configuration object. - force (bool): Whether to force reconfiguration. - - Returns: - Tuple[str, bool]: The validated or selected workspace name and a boolean - indicating whether the configuration file needs updating. - - Raises: - ConfigurationError: If the provided workspace is invalid. - """ - - # Case 1: Workspace was provided by the user and is valid - if workspace is not None: - if not is_workspace_name_correct(api_key, workspace): - raise ConfigurationError( - "Workspace `%s` is incorrect for the given API key.", workspace - ) - return workspace, True - - # Case 2: Use workspace from current configuration if not forced to change - if ( - "workspace" in current_config.model_fields_set - and current_config.workspace != OPIK_WORKSPACE_DEFAULT_NAME - and not force - ): - return current_config.workspace, False - - # Case 3: No workspace provided, prompt the user - default_workspace = get_default_workspace(api_key) - use_default_workspace = ask_user_for_approval( - f'Do you want to use "{default_workspace}" workspace? (Y/n)' - ) - - if use_default_workspace: - workspace = default_workspace - else: - workspace = _ask_for_workspace(api_key=api_key) - - return workspace, True - - -def configure( - api_key: Optional[str] = None, - workspace: Optional[str] = None, - url: Optional[str] = None, - use_local: bool = False, - force: bool = False, -) -> None: - """ - Create a local configuration file for the Python SDK. If a configuration file already exists, - it will not be overwritten unless the `force` parameter is set to True. - - Args: - api_key: The API key if using an Opik Cloud. - workspace: The workspace name if using an Opik Cloud. - url: The URL of the Opik instance if you are using a local deployment. - use_local: Whether to use a local deployment. - force: If true, the configuration file will be recreated and existing settings will be overwritten. - - Raises: - ConfigurationError - """ - - # OPIK CLOUD - if use_local is False: - _configure_cloud( - api_key=api_key, - workspace=workspace, - force=force, - ) - return - - # LOCAL OPIK DEPLOYMENT - _configure_local(url=url, force=force) - return - - -def _configure_cloud( - api_key: Optional[str], - workspace: Optional[str], - force: bool = False, -) -> None: - """ - Configure the cloud Opik instance by handling API key and workspace settings. - - Args: - api_key (Optional[str]): The API key for the Opik Cloud. - workspace (Optional[str]): The workspace name for the Opik Cloud. - force (bool): If True, forces reconfiguration by overwriting the existing settings. - """ - current_config = opik.config.OpikConfig() - - # TODO: Update the is_interactive() check, today always returns True so commented the code below - # # first check parameters. - # if is_interactive() is False and api_key is None and current_config.api_key is None: - # raise ConfigurationError("No API key provided for cloud Opik instance.") - - # if ( - # is_interactive() is False - # and workspace is None - # and current_config.workspace is None - # ): - # raise ConfigurationError("No workspace name provided for cloud Opik instance.") - - # Handle API key: get or prompt for one if needed - api_key, update_config_with_api_key = _get_api_key( - api_key=api_key, - current_config=current_config, - force=force, - ) - - # Handle workspace: get or prompt for one if needed - workspace, update_config_with_workspace = _get_workspace( - workspace=workspace, - api_key=api_key, - current_config=current_config, - force=force, - ) - - # Update configuration if either API key or workspace has changed - if update_config_with_api_key or update_config_with_workspace: - _update_config( - api_key=api_key, - url=OPIK_BASE_URL_CLOUD, - workspace=workspace, - ) - else: - LOGGER.info( - "Opik is already configured. You can check the settings by viewing the config file at %s", - current_config.config_file_fullpath, - ) - - -def _configure_local(url: Optional[str], force: bool = False) -> None: - """ - Configure the local Opik instance by setting the local URL and workspace. - - Args: - url (Optional[str]): The URL of the local Opik instance. - force (bool): Whether to force the configuration even if local settings exist. - - Raises: - ConfigurationError: Raised if the Opik instance is not active or not found. - """ - # TODO: this needs to be refactored - _login_local might only need url from the outside. - # But we still have to init api_key and workspace because they are required in order to update config - api_key = None - workspace = OPIK_WORKSPACE_DEFAULT_NAME - current_config = opik.config.OpikConfig() - - # Step 1: If the URL is provided and active, update the configuration - if url is not None and is_instance_active(url): - _update_config( - api_key=api_key, - url=url, - workspace=workspace, - ) - return - - # Step 2: Check if the default local instance is active - if is_instance_active(OPIK_BASE_URL_LOCAL): - if not force and current_config.url_override == OPIK_BASE_URL_LOCAL: - LOGGER.info( - f"Opik is already configured to local instance at {OPIK_BASE_URL_LOCAL}." - ) - return - - # Step 4: Ask user if they want to use the found local instance - use_url = ask_user_for_approval( - f"Found local Opik instance on: {OPIK_BASE_URL_LOCAL}, do you want to use it? (Y/n)" - ) - - if use_url: - _update_config( - api_key=api_key, - url=OPIK_BASE_URL_LOCAL, - workspace=workspace, - ) - return - - # Step 5: Ask user for URL if no valid local instance is found or approved - user_input_url = _ask_for_url() - _update_config( - api_key=api_key, - url=user_input_url, - workspace=workspace, - ) diff --git a/sdks/python/tests/unit/configurator/test_configure.py b/sdks/python/tests/unit/configurator/test_configure.py new file mode 100644 index 0000000000..8286636530 --- /dev/null +++ b/sdks/python/tests/unit/configurator/test_configure.py @@ -0,0 +1,1338 @@ +from pathlib import Path +from unittest.mock import MagicMock, Mock, patch + +import httpx +import pytest + +from opik.config import ( + OPIK_BASE_URL_CLOUD, + OPIK_BASE_URL_LOCAL, + OPIK_WORKSPACE_DEFAULT_NAME, + OpikConfig, +) +from opik.configurator.configure import OpikConfigurator +from opik.exceptions import ConfigurationError + + +@pytest.fixture(autouse=True) +def mock_env_and_file(monkeypatch): + monkeypatch.delenv("OPIK_API_KEY", raising=False) + monkeypatch.delenv("OPIK_WORKSPACE", raising=False) + monkeypatch.delenv("OPIK_URL_OVERRIDE", raising=False) + + with patch("builtins.open", side_effect=FileNotFoundError): + yield + + +class TestIsInstanceActive: + @pytest.mark.parametrize( + "status_code, expected_result", + [ + (200, True), + (404, False), + (500, False), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_is_instance_active(self, mock_httpx_client, status_code, expected_result): + """ + Test various HTTP status code responses to check if the instance is active. + """ + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = status_code + + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + url = "http://example.com" + result = OpikConfigurator()._is_instance_active(url) + + assert result == expected_result + + @patch("opik.configurator.configure.httpx.Client") + def test_is_instance_active_timeout(self, mock_httpx_client): + """ + Test that a connection timeout results in False being returned. + """ + mock_client_instance = MagicMock() + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.side_effect = httpx.ConnectTimeout("timeout") + + mock_httpx_client.return_value = mock_client_instance + + url = "http://example.com" + result = OpikConfigurator()._is_instance_active(url) + + assert result is False + + @patch("opik.configurator.configure.httpx.Client") + def test_is_instance_active_general_exception(self, mock_httpx_client): + """ + Test that any general exception results in False being returned. + """ + mock_client_instance = MagicMock() + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.side_effect = Exception("Unexpected error") + + mock_httpx_client.return_value = mock_client_instance + + url = "http://example.com" + result = OpikConfigurator()._is_instance_active(url) + + assert result is False + + +class TestIsWorkspaceNameCorrect: + @pytest.mark.parametrize( + "api_key, workspace, workspace_names, expected_result", + [ + ("valid_api_key", "correct_workspace", ["correct_workspace"], True), + ("valid_api_key", "incorrect_workspace", ["other_workspace"], False), + ("valid_api_key", "empty_workspace", [], False), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_workspace_valid_api_key( + self, mock_httpx_client, api_key, workspace, workspace_names, expected_result + ): + """ + Test cases with valid API keys and workspace verification. + These tests simulate different workspace existence conditions. + """ + # Mock the HTTP response for valid API key cases + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"workspaceNames": workspace_names} + + # Mock the context manager behavior + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + result = OpikConfigurator( + api_key=api_key, workspace=workspace, url=OPIK_BASE_URL_CLOUD + )._is_workspace_name_correct(workspace) + assert result == expected_result + + @pytest.mark.parametrize( + "status_code, response_text", + [(500, "Internal Server Error"), (404, "Not Found"), (403, "Forbidden")], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_workspace_non_200_response( + self, mock_httpx_client, status_code, response_text + ): + """ + Test cases where the API responds with a non-200 status code. + These responses should raise a ConnectionError. + """ + # Mock the HTTP response for non-200 status code cases + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = status_code + mock_response.text = response_text + + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + api_key = "valid_api_key" + workspace = "any_workspace" + + with pytest.raises(ConnectionError): + OpikConfigurator( + api_key=api_key, workspace=workspace + )._is_workspace_name_correct(workspace) + + @pytest.mark.parametrize( + "exception", + [ + (httpx.RequestError("Timeout", request=MagicMock())), + (Exception("Unexpected error")), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_workspace_request_exceptions(self, mock_httpx_client, exception): + """ + Test cases where an exception is raised during the HTTP request. + These cases should raise a ConnectionError with the appropriate message. + """ + # Mock the HTTP request to raise an exception + mock_client_instance = MagicMock() + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.side_effect = exception + + mock_httpx_client.return_value = mock_client_instance + + api_key = "valid_api_key" + workspace = "any_workspace" + + # Check that the appropriate ConnectionError is raised + with pytest.raises(ConnectionError): + OpikConfigurator( + api_key=api_key, workspace=workspace + )._is_workspace_name_correct(workspace) + + +class TestIsApiKeyCorrect: + @pytest.mark.parametrize( + "status_code, expected_result", + [ + (200, True), + (401, False), + (403, False), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_is_api_key_correct(self, mock_httpx_client, status_code, expected_result): + """ + Test valid, invalid, and forbidden API key scenarios by simulating HTTP status codes. + """ + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = status_code + + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + api_key = "dummy_api_key" + result = OpikConfigurator(url=OPIK_BASE_URL_CLOUD)._is_api_key_correct(api_key) + + assert result == expected_result + + @pytest.mark.parametrize( + "status_code, response_text", + [(500, "Internal Server Error")], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_is_api_key_correct_non_200_response( + self, mock_httpx_client, status_code, response_text + ): + """ + Test that a non-200, 401, or 403 response raises a ConnectionError. + """ + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = status_code + mock_response.text = response_text + + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + api_key = "dummy_api_key" + + with pytest.raises(ConnectionError): + OpikConfigurator()._is_api_key_correct(api_key) + + @pytest.mark.parametrize( + "exception", + [ + (httpx.RequestError("Timeout", request=MagicMock())), + (Exception("Unexpected error")), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_is_api_key_correct_exceptions(self, mock_httpx_client, exception): + """ + Test that RequestError and general exceptions are properly raised as ConnectionError. + """ + mock_client_instance = MagicMock() + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.__exit__.return_value = False + mock_client_instance.get.side_effect = exception + + mock_httpx_client.return_value = mock_client_instance + + api_key = "dummy_api_key" + + with pytest.raises(ConnectionError): + OpikConfigurator()._is_api_key_correct(api_key) + + +class TestGetDefaultWorkspace: + @pytest.mark.parametrize( + "status_code, response_json, expected_result", + [ + (200, {"defaultWorkspaceName": "workspace1"}, "workspace1"), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_get_default_workspace_success( + self, mock_httpx_client, status_code, response_json, expected_result + ): + """ + Test successful retrieval of the default workspace name. + """ + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = status_code + mock_response.json.return_value = response_json + + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + api_key = "valid_api_key" + result = OpikConfigurator( + api_key=api_key, url=OPIK_BASE_URL_CLOUD + )._get_default_workspace() + assert result == expected_result + + @pytest.mark.parametrize( + "status_code, response_text", + [ + (500, "Internal Server Error"), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_get_default_workspace_non_200_status( + self, mock_httpx_client, status_code, response_text + ): + """ + Test that non-200 status codes raise a ConnectionError. + """ + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = status_code + mock_response.text = response_text + + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + api_key = "valid_api_key" + with pytest.raises(ConnectionError): + OpikConfigurator(api_key=api_key)._get_default_workspace() + + @pytest.mark.parametrize( + "response_json", + [ + {}, + {"otherKey": "value"}, + None, + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_get_default_workspace_missing_key(self, mock_httpx_client, response_json): + """ + Test that missing 'defaultWorkspaceName' in the response raises a ConnectionError. + """ + mock_client_instance = MagicMock() + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = response_json + + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.get.return_value = mock_response + mock_httpx_client.return_value = mock_client_instance + + api_key = "valid_api_key" + with pytest.raises(ConnectionError): + OpikConfigurator(api_key=api_key)._get_default_workspace() + + @pytest.mark.parametrize( + "exception", + [ + httpx.RequestError("Timeout", request=MagicMock()), + Exception("Unexpected error"), + ], + ) + @patch("opik.configurator.configure.httpx.Client") + def test_get_default_workspace_exceptions(self, mock_httpx_client, exception): + """ + Test that network and unexpected exceptions are raised as ConnectionError. + """ + mock_client_instance = MagicMock() + mock_client_instance.__enter__.return_value = mock_client_instance + mock_client_instance.get.side_effect = exception + mock_httpx_client.return_value = mock_client_instance + + api_key = "valid_api_key" + + with pytest.raises(ConnectionError): + OpikConfigurator(api_key=api_key)._get_default_workspace() + + +class TestUpdateConfig: + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch("opik.configurator.configure.opik.config.update_session_config") + def test_update_config_success(self, mock_update_session_config, mock_opik_config): + """ + Test successful update of the config and session. + """ + mock_config_instance = MagicMock() + mock_opik_config.return_value = mock_config_instance + + api_key = "dummy_api_key" + url = "http://example.com" + workspace = "workspace1" + + OpikConfigurator(api_key, workspace, url)._update_config() + + # Ensure config object is created and saved + mock_opik_config.assert_called_with( + api_key=api_key, + url_override=url, + workspace=workspace, + ) + mock_config_instance.save_to_file.assert_called_once() + + # Ensure session config is updated + mock_update_session_config.assert_any_call("api_key", api_key) + mock_update_session_config.assert_any_call("url_override", url) + mock_update_session_config.assert_any_call("workspace", workspace) + + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch("opik.configurator.configure.opik.config.update_session_config") + def test_update_config_raises_exception( + self, mock_update_session_config, mock_opik_config + ): + """ + Test that ConfigurationError is raised when an exception occurs during saving config file. + """ + mock_opik_config.side_effect = [None, Exception("Unexpected error")] + + api_key = "dummy_api_key" + url = "http://example.com" + workspace = "workspace1" + + with pytest.raises(ConfigurationError, match="Failed to update configuration."): + OpikConfigurator(api_key, workspace, url)._update_config() + + # Ensure save_to_file is not called due to the exception + mock_update_session_config.assert_not_called() + + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch("opik.configurator.configure.opik.config.update_session_config") + def test_update_config_session_update_failure( + self, mock_update_session_config, mock_opik_config + ): + """ + Test that ConfigurationError is raised if updating the session configuration fails. + """ + mock_config_instance = MagicMock() + mock_opik_config.return_value = mock_config_instance + mock_update_session_config.side_effect = Exception("Session update failed") + + api_key = "dummy_api_key" + url = "http://example.com" + workspace = "workspace1" + + with pytest.raises(ConfigurationError, match="Failed to update configuration."): + OpikConfigurator(api_key, workspace, url)._update_config() + + # Ensure config object is created and saved + mock_opik_config.assert_called_with( + api_key=api_key, + url_override=url, + workspace=workspace, + ) + mock_config_instance.save_to_file.assert_called_once() + + +class TestAskForUrl: + @patch("builtins.input", side_effect=["http://valid-url.com"]) + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=True, + ) + def test_ask_for_url_success(self, mock_is_instance_active, mock_input): + """ + Test successful input of a valid Opik URL. + """ + config = OpikConfigurator() + config._ask_for_url() + assert config.url == "http://valid-url.com" + mock_is_instance_active.assert_called_once_with("http://valid-url.com") + + @patch("builtins.input", side_effect=["http://invalid-url.com"] * 3) + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=False, + ) + def test_ask_for_url_all_retries_fail(self, mock_is_instance_active, mock_input): + """ + Test that after 3 failed attempts, a ConfigurationError is raised. + """ + with pytest.raises(ConfigurationError, match="Cannot use the URL provided"): + OpikConfigurator()._ask_for_url() + + assert mock_is_instance_active.call_count == 3 + + @patch( + "builtins.input", side_effect=["http://invalid-url.com", "http://valid-url.com"] + ) + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + side_effect=[False, True], + ) + def test_ask_for_url_success_on_second_try( + self, mock_is_instance_active, mock_input + ): + """ + Test that the URL is successfully returned on the second attempt after the first failure. + """ + config = OpikConfigurator() + config._ask_for_url() + assert config.url == "http://valid-url.com" + assert mock_is_instance_active.call_count == 2 + + @patch( + "builtins.input", + side_effect=[ + "http://invalid-url.com", + "http://invalid-url-2.com", + "http://valid-url.com", + ], + ) + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + side_effect=[False, False, True], + ) + def test_ask_for_url_success_on_third_try( + self, mock_is_instance_active, mock_input + ): + """ + Test that the URL is successfully returned on the third attempt after two failures. + """ + config = OpikConfigurator() + config._ask_for_url() + assert config.url == "http://valid-url.com" + assert mock_is_instance_active.call_count == 3 + + @patch("builtins.input", side_effect=["http://invalid-url.com"] * 3) + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=False, + ) + @patch("opik.configurator.configure.LOGGER.error") + def test_ask_for_url_logging( + self, mock_logger_error, mock_is_instance_active, mock_input + ): + """ + Test that errors are logged when the URL is not accessible. + """ + with pytest.raises(ConfigurationError): + config = OpikConfigurator() + config._ask_for_url() + + assert mock_logger_error.call_count == 3 + mock_logger_error.assert_called_with( + f"Opik is not accessible at http://invalid-url.com. Please try again," + f" the URL should follow a format similar to {OPIK_BASE_URL_LOCAL}" + ) + + +class TestAskForApiKey: + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch("opik.configurator.configure.getpass.getpass", return_value="valid_api_key") + @patch( + "opik.configurator.configure.OpikConfigurator._is_api_key_correct", + return_value=True, + ) + def test_ask_for_api_key_success( + self, mock_is_api_key_correct, mock_getpass, mock_is_interactive + ): + """ + Test successful entry of a valid API key. + """ + config = OpikConfigurator() + config._ask_for_api_key() + assert config.api_key == "valid_api_key" + mock_is_api_key_correct.assert_called_once_with("valid_api_key") + + @patch("opik.configurator.configure.is_interactive", return_value=False) + def test_ask_for_api_key_non_interactive_mode(self, mock_is_interactive): + config = OpikConfigurator() + + with pytest.raises(ConfigurationError): + config._ask_for_api_key() + mock_is_interactive.assert_called_once() + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch( + "opik.configurator.configure.getpass.getpass", return_value="invalid_api_key" + ) + @patch( + "opik.configurator.configure.OpikConfigurator._is_api_key_correct", + return_value=False, + ) + def test_ask_for_api_key_all_retries_fail( + self, mock_is_api_key_correct, mock_getpass, mock_is_interactive + ): + """ + Test that after 3 invalid API key attempts, a ConfigurationError is raised. + """ + with pytest.raises(ConfigurationError, match="API key is incorrect."): + OpikConfigurator()._ask_for_api_key() + + assert mock_is_api_key_correct.call_count == 3 + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch( + "opik.configurator.configure.getpass.getpass", + side_effect=["invalid_key", "valid_key"], + ) + @patch( + "opik.configurator.configure.OpikConfigurator._is_api_key_correct", + side_effect=[False, True], + ) + def test_ask_for_api_key_success_on_second_try( + self, mock_is_api_key_correct, mock_getpass, mock_is_interactive + ): + """ + Test that the correct API key is entered on the second attempt after the first one is invalid. + """ + config = OpikConfigurator() + config._ask_for_api_key() + assert config.api_key == "valid_key" + assert mock_is_api_key_correct.call_count == 2 + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch( + "opik.configurator.configure.getpass.getpass", + side_effect=["invalid_key1", "invalid_key2", "valid_key"], + ) + @patch( + "opik.configurator.configure.OpikConfigurator._is_api_key_correct", + side_effect=[False, False, True], + ) + def test_ask_for_api_key_success_on_third_try( + self, mock_is_api_key_correct, mock_getpass, mock_is_interactive + ): + """ + Test that the correct API key is entered on the third attempt after two invalid attempts. + """ + config = OpikConfigurator() + config._ask_for_api_key() + assert config.api_key == "valid_key" + assert mock_is_api_key_correct.call_count == 3 + + +class TestAskForWorkspace: + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch("builtins.input", return_value="valid_workspace") + @patch( + "opik.configurator.configure.OpikConfigurator._is_workspace_name_correct", + return_value=True, + ) + def test_ask_for_workspace_success( + self, mock_is_workspace_name_correct, mock_input, mock_is_interactive + ): + """ + Test successful entry of a valid workspace name. + """ + api_key = "valid_api_key" + config = OpikConfigurator(api_key=api_key) + config._ask_for_workspace() + assert config.workspace == "valid_workspace" + mock_is_workspace_name_correct.assert_called_once_with("valid_workspace") + + @patch("opik.configurator.configure.is_interactive", return_value=False) + def test_ask_for_workspace_non_interactive_mode(self, mock_is_interactive): + api_key = "valid_api_key" + config = OpikConfigurator(api_key=api_key) + + with pytest.raises(ConfigurationError): + config._ask_for_workspace() + mock_is_interactive.assert_called_once() + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch("builtins.input", return_value="invalid_workspace") + @patch( + "opik.configurator.configure.OpikConfigurator._is_workspace_name_correct", + return_value=False, + ) + def test_ask_for_workspace_all_retries_fail( + self, mock_is_workspace_name_correct, mock_input, mock_is_interactive + ): + """ + Test that after 3 invalid workspace name attempts, a ConfigurationError is raised. + """ + api_key = "valid_api_key" + + with pytest.raises( + ConfigurationError, + match="User does not have access to the workspaces provided.", + ): + OpikConfigurator(api_key)._ask_for_workspace() + + assert mock_is_workspace_name_correct.call_count == 3 + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch("builtins.input", side_effect=["invalid_workspace", "valid_workspace"]) + @patch( + "opik.configurator.configure.OpikConfigurator._is_workspace_name_correct", + side_effect=[False, True], + ) + def test_ask_for_workspace_success_on_second_try( + self, mock_is_workspace_name_correct, mock_input, mock_is_interactive + ): + """ + Test that the workspace name is successfully entered on the second attempt after the first one is invalid. + """ + api_key = "valid_api_key" + config = OpikConfigurator(api_key=api_key) + config._ask_for_workspace() + assert config.workspace == "valid_workspace" + assert mock_is_workspace_name_correct.call_count == 2 + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch( + "builtins.input", + side_effect=["invalid_workspace1", "invalid_workspace2", "valid_workspace"], + ) + @patch( + "opik.configurator.configure.OpikConfigurator._is_workspace_name_correct", + side_effect=[False, False, True], + ) + def test_ask_for_workspace_success_on_third_try( + self, mock_is_workspace_name_correct, mock_input, mock_is_interactive + ): + """ + Test that the workspace name is successfully entered on the third attempt after two invalid attempts. + """ + api_key = "valid_api_key" + config = OpikConfigurator(api_key=api_key) + config._ask_for_workspace() + assert config.workspace == "valid_workspace" + assert mock_is_workspace_name_correct.call_count == 3 + + +class TestGetApiKey: + def set_api_key(self): + self.configurator.api_key = "new_api_key" + + @patch("opik.configurator.configure.OpikConfigurator._ask_for_api_key") + def test_get_api_key_force_ask(self, mock_ask_for_api_key): + """ + Test that when force=True and no API key is provided, the user is asked for an API key. + """ + mock_ask_for_api_key.side_effect = self.set_api_key + + self.configurator = OpikConfigurator(api_key=None, force=True) + needs_update = self.configurator._set_api_key() + + assert self.configurator.api_key == "new_api_key" + assert needs_update is True + mock_ask_for_api_key.assert_called_once() + + @patch("opik.configurator.configure.OpikConfigurator._ask_for_api_key") + def test_get_api_key_ask_for_missing_key(self, mock_ask_for_api_key): + """ + Test that when no API key is provided and none is present in the config, the user is asked for an API key. + """ + mock_ask_for_api_key.side_effect = self.set_api_key + + self.configurator = OpikConfigurator(api_key=None, force=False) + needs_update = self.configurator._set_api_key() + + assert self.configurator.api_key == "new_api_key" + assert needs_update is True + mock_ask_for_api_key.assert_called_once() + + @patch("opik.configurator.configure.opik.config.OpikConfig") + def test_get_api_key_use_config_key(self, mock_opik_config): + """ + Test that the API key is taken from the current config when provided and force=False. + """ + mock_config_instance = MagicMock() + mock_config_instance.api_key = "new_api_key" + mock_opik_config.return_value = mock_config_instance + + configurator = OpikConfigurator(api_key=None, force=False) + needs_update = configurator._set_api_key() + + assert configurator.api_key == "new_api_key" + assert needs_update is False + + @patch( + "opik.configurator.configure.OpikConfigurator._is_api_key_correct", + return_value=True, + ) + def test_get_api_key_provided_key(self, mock_is_api_key_correct): + """ + Test that the user-provided API key is used directly if it's passed in. + """ + configurator = OpikConfigurator( + api_key="config_api_key", url=OPIK_BASE_URL_CLOUD, force=True + ) + needs_update = configurator._set_api_key() + + mock_is_api_key_correct.assert_called_once() + + assert configurator.api_key == "config_api_key" + assert needs_update is True + + +class TestGetWorkspace: + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch( + "opik.configurator.configure.OpikConfigurator._is_workspace_name_correct", + return_value=True, + ) + def test_get_workspace_user_provided_valid_force( + self, mock_is_workspace_name_correct, mock_opik_config + ): + """ + Test that the workspace provided by the user is valid and used. + """ + mock_config_instance = MagicMock() + mock_config_instance.workspace = "existing_workspace" + mock_opik_config.return_value = mock_config_instance + + configurator = OpikConfigurator( + workspace="new_workspace", force=True, api_key="valid_api_key" + ) + + needs_update = configurator._set_workspace() + + assert configurator.workspace == "new_workspace" + assert needs_update is True + mock_is_workspace_name_correct.assert_called_once_with("new_workspace") + + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch( + "opik.configurator.configure.OpikConfigurator._is_workspace_name_correct", + return_value=True, + ) + def test_get_workspace_user_provided_valid_not_force( + self, mock_is_workspace_name_correct, mock_opik_config + ): + """ + Test that the workspace provided by the user is valid and used. + """ + mock_config_instance = MagicMock() + mock_config_instance.workspace = "existing_workspace" + mock_opik_config.return_value = mock_config_instance + + configurator = OpikConfigurator( + workspace="new_workspace", force=False, api_key="valid_api_key" + ) + + needs_update = configurator._set_workspace() + + assert configurator.workspace == "new_workspace" + assert needs_update is False + mock_is_workspace_name_correct.assert_called_once_with("new_workspace") + + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch( + "opik.configurator.configure.OpikConfigurator._is_workspace_name_correct", + return_value=False, + ) + def test_get_workspace_user_provided_invalid( + self, mock_is_workspace_name_correct, mock_opik_config + ): + """ + Test that a ConfigurationError is raised if the user-provided workspace is invalid. + """ + mock_config_instance = MagicMock() + mock_config_instance.workspace = "existing_workspace" + mock_opik_config.return_value = mock_config_instance + + configurator = OpikConfigurator( + workspace="invalid_workspace", force=False, api_key="valid_api_key" + ) + + with pytest.raises(ConfigurationError): + configurator._set_workspace() + + mock_is_workspace_name_correct.assert_called_once_with("invalid_workspace") + + @patch("opik.configurator.configure.opik.config.OpikConfig") + def test_get_workspace_use_config(self, mock_opik_config): + """ + Test that the workspace from the current config is used when no workspace is provided and not forced. + """ + current_config = OpikConfig(workspace="configured_workspace") + mock_opik_config.return_value = current_config + + configurator = OpikConfigurator( + workspace=None, force=False, api_key="valid_api_key" + ) + needs_update = configurator._set_workspace() + + assert configurator.workspace == "configured_workspace" + assert needs_update is False + + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch( + "opik.configurator.configure.OpikConfigurator._get_default_workspace", + return_value="default_workspace", + ) + @patch("opik.configurator.configure.ask_user_for_approval", return_value=True) + def test_get_workspace_accept_default( + self, mock_ask_user_for_approval, mock_get_default_workspace, mock_opik_config + ): + """ + Test that the user accepts the default workspace. + """ + current_config = OpikConfig(workspace=OPIK_WORKSPACE_DEFAULT_NAME) + mock_opik_config.return_value = current_config + + configurator = OpikConfigurator( + workspace=None, force=False, api_key="valid_api_key" + ) + needs_update = configurator._set_workspace() + + assert configurator.workspace == "default_workspace" + assert needs_update is True + mock_get_default_workspace.assert_called_once_with() + mock_ask_user_for_approval.assert_called_once_with( + 'Do you want to use "default_workspace" workspace? (Y/n)' + ) + + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch( + "opik.configurator.configure.OpikConfigurator._get_default_workspace", + return_value="default_workspace", + ) + @patch("opik.configurator.configure.ask_user_for_approval", return_value=False) + @patch("opik.configurator.configure.OpikConfigurator._ask_for_workspace") + def test_get_workspace_choose_different( + self, + mock_ask_for_workspace, + mock_ask_user_for_approval, + mock_get_default_workspace, + mock_opik_config, + ): + """ + Test that the user declines the default workspace and chooses a new one. + """ + + def set_workspace(): + configurator.workspace = "new_workspace" + + mock_ask_for_workspace.side_effect = set_workspace + + current_config = OpikConfig(workspace=OPIK_WORKSPACE_DEFAULT_NAME) + mock_opik_config.return_value = current_config + + configurator = OpikConfigurator( + workspace=None, force=False, api_key="valid_api_key" + ) + needs_update = configurator._set_workspace() + + assert configurator.workspace == "new_workspace" + assert needs_update is True + mock_get_default_workspace.assert_called_once_with() + mock_ask_user_for_approval.assert_called_once_with( + 'Do you want to use "default_workspace" workspace? (Y/n)' + ) + mock_ask_for_workspace.assert_called_once_with() + + +class TestConfigureCloud: + @patch("opik.configurator.configure.OpikConfigurator._set_api_key") + @patch("opik.configurator.configure.OpikConfigurator._set_workspace") + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_cloud_with_update( + self, mock_update_config, mock_set_workspace, mock_set_api_key + ): + """ + Test that the configuration is updated when both API key and workspace require updates. + """ + + def set_workspace(): + configurator.workspace = "valid_workspace" + return True + + def set_api_key(): + configurator.api_key = "valid_api_key" + return True + + mock_set_api_key.side_effect = set_api_key + mock_set_workspace.side_effect = set_workspace + + configurator = OpikConfigurator(api_key=None, workspace=None, force=False) + configurator._configure_cloud() + + mock_set_api_key.assert_called_once() + mock_set_workspace.assert_called_once() + mock_update_config.assert_called_once() + + assert configurator.api_key == "valid_api_key" + assert configurator.url == OPIK_BASE_URL_CLOUD + assert configurator.workspace == "valid_workspace" + + @patch("opik.configurator.configure.OpikConfigurator._set_api_key") + @patch("opik.configurator.configure.OpikConfigurator._set_workspace") + @patch("opik.configurator.configure.LOGGER.info") + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_cloud_no_update_needed( + self, + mock_update_config, + mock_opik_config, + mock_logger_info, + mock_set_workspace, + mock_set_api_key, + ): + """ + Test that no configuration update happens when both API key and workspace are already set. + """ + + def set_workspace(): + configurator.workspace = "valid_workspace" + return False + + def set_api_key(): + configurator.api_key = "valid_api_key" + return False + + mock_set_api_key.side_effect = set_api_key + mock_set_workspace.side_effect = set_workspace + + # Mock the config file path to return a specific path + mock_config_instance = MagicMock() + mock_config_instance.config_file_fullpath = Path("/some/path/.opik.config") + mock_opik_config.return_value = mock_config_instance + + # Call the function + configurator = OpikConfigurator( + api_key="valid_api_key", workspace="valid_workspace", force=False + ) + configurator._configure_cloud() + + # Ensure API key and workspace were checked + mock_set_api_key.assert_called_once() + mock_set_workspace.assert_called_once() + + # Check config file wasn't overwritten, but session updated + mock_update_config.assert_called_once_with(save_to_file=False) + + # Check the logging message + mock_logger_info.assert_called_with( + "Opik is already configured. You can check the settings by viewing the config file at %s", + Path("/some/path/.opik.config"), + ) + + @patch("opik.configurator.configure.OpikConfigurator._set_api_key") + @patch("opik.configurator.configure.OpikConfigurator._set_workspace") + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_cloud_api_key_updated( + self, mock_update_config, mock_set_workspace, mock_set_api_key + ): + """ + Test that the configuration is updated when only the API key changes. + """ + + def set_workspace(): + configurator.workspace = "configured_workspace" + return False + + def set_api_key(): + configurator.api_key = "new_api_key" + return True + + mock_set_api_key.side_effect = set_api_key + mock_set_workspace.side_effect = set_workspace + + configurator = OpikConfigurator( + api_key=None, workspace="configured_workspace", force=False + ) + configurator._configure_cloud() + + mock_set_api_key.assert_called_once() + mock_set_workspace.assert_called_once() + mock_update_config.assert_called_once() + + assert configurator.api_key == "new_api_key" + assert configurator.url == OPIK_BASE_URL_CLOUD + assert configurator.workspace == "configured_workspace" + + @patch("opik.configurator.configure.OpikConfigurator._set_api_key") + @patch("opik.configurator.configure.OpikConfigurator._set_workspace") + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_cloud_workspace_updated( + self, mock_update_config, mock_set_workspace, mock_set_api_key + ): + """ + Test that the configuration is updated when only the workspace changes. + """ + + def set_workspace(): + configurator.workspace = "new_workspace" + return False + + def set_api_key(): + configurator.api_key = "valid_api_key" + return True + + mock_set_api_key.side_effect = set_api_key + mock_set_workspace.side_effect = set_workspace + + configurator = OpikConfigurator( + api_key="valid_api_key", workspace=None, force=False + ) + configurator._configure_cloud() + + mock_set_api_key.assert_called_once() + mock_set_workspace.assert_called_once() + mock_update_config.assert_called_once() + + assert configurator.api_key == "valid_api_key" + assert configurator.url == OPIK_BASE_URL_CLOUD + assert configurator.workspace == "new_workspace" + + +class TestConfigureLocal: + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch("opik.configurator.configure.OpikConfigurator._ask_for_url") + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=False, + ) + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_local_asks_for_url( + self, + mock_update_config, + mock_is_instance_active, + mock_ask_for_url, + mock_is_interactive, + ): + """ + Test that the function asks for a URL if no local instance is active and no URL is provided. + """ + + def set_url(): + configurator.url = "http://user-provided-url.com" + + mock_ask_for_url.side_effect = set_url + + configurator = OpikConfigurator(url=None, force=False) + configurator._configure_local() + + mock_ask_for_url.assert_called_once() + mock_update_config.assert_called_once() + + assert configurator.api_key is None + assert configurator.url == "http://user-provided-url.com" + assert configurator.workspace == OPIK_WORKSPACE_DEFAULT_NAME + + @patch("opik.configurator.configure.is_interactive", return_value=False) + @patch("opik.configurator.configure.OpikConfigurator._ask_for_url") + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=False, + ) + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_local_asks_for_url__non_interactive( + self, + mock_update_config, + mock_is_instance_active, + mock_ask_for_url, + mock_is_interactive, + ): + """ + Test that the function asks for a URL if no local instance is active and no URL is provided. + """ + + def set_url(): + configurator.url = "http://user-provided-url.com" + + mock_ask_for_url.side_effect = set_url + + configurator = OpikConfigurator(url=None, force=False) + + with pytest.raises(ConfigurationError): + configurator._configure_local() + + mock_ask_for_url.assert_not_called() + mock_update_config.assert_not_called() + + @patch("opik.configurator.configure.OpikConfigurator._ask_for_url") + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=True, + ) + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_local_with_provided_url( + self, mock_update_config, mock_is_instance_active, mock_ask_for_url + ): + """ + Test that the function configures the provided URL if it is active. + """ + configurator = OpikConfigurator( + url="http://custom-local-instance.com", force=False + ) + configurator._configure_local() + + mock_ask_for_url.assert_not_called() + mock_is_instance_active.assert_called_once_with( + "http://custom-local-instance.com" + ) + mock_update_config.assert_called_once() + + assert configurator.api_key is None + assert configurator.url == "http://custom-local-instance.com" + assert configurator.workspace == OPIK_WORKSPACE_DEFAULT_NAME + + @patch("opik.configurator.configure.OpikConfigurator._ask_for_url") + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=True, + ) + @patch("opik.configurator.configure.opik.config.OpikConfig") + @patch("opik.configurator.configure.LOGGER.info") + def test_configure_local_no_update_needed( + self, + mock_logger_info, + mock_opik_config, + mock_is_instance_active, + mock_ask_for_url, + ): + """ + Test that no update happens if the local instance is already configured and force=False. + """ + mock_config_instance = MagicMock() + mock_config_instance.url_override = OPIK_BASE_URL_LOCAL + mock_opik_config.return_value = mock_config_instance + + configurator = OpikConfigurator(url=None, force=False) + configurator._configure_local() + + mock_ask_for_url.assert_not_called() + mock_is_instance_active.assert_called_once_with(OPIK_BASE_URL_LOCAL) + mock_logger_info.assert_called_once_with( + f"Opik is already configured to local instance at {OPIK_BASE_URL_LOCAL}." + ) + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch("opik.configurator.configure.ask_user_for_approval", return_value=True) + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=True, + ) + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_local_uses_local_instance( + self, + mock_update_config, + mock_is_instance_active, + mock_ask_user_for_approval, + mock_is_interactive, + ): + """ + Test that the function configures the local instance when found and user approves. + """ + configurator = OpikConfigurator(url=None, force=False) + configurator._configure_local() + + mock_ask_user_for_approval.assert_called_once_with( + f"Found local Opik instance on: {OPIK_BASE_URL_LOCAL}, do you want to use it? (Y/n)" + ) + mock_update_config.assert_called_once_with() + + assert configurator.api_key is None + assert configurator.url == OPIK_BASE_URL_LOCAL + assert configurator.workspace == OPIK_WORKSPACE_DEFAULT_NAME + + @patch("opik.configurator.configure.is_interactive", return_value=False) + @patch("opik.configurator.configure.ask_user_for_approval", return_value=True) + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=True, + ) + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_local_uses_local_instance__non_interactive( + self, + mock_update_config, + mock_is_instance_active, + mock_ask_user_for_approval, + mock_is_interactive, + ): + """ + Test that the function configures the local instance when found and user approves. + """ + configurator = OpikConfigurator(url=None, force=False) + with pytest.raises(ConfigurationError): + configurator._configure_local() + + mock_ask_user_for_approval.assert_not_called() + mock_update_config.assert_not_called() + + @patch("opik.configurator.configure.is_interactive", return_value=True) + @patch("opik.configurator.configure.ask_user_for_approval", return_value=False) + @patch("opik.configurator.configure.OpikConfigurator._ask_for_url") + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=True, + ) + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_local_user_declines_local_instance( + self, + mock_update_config, + mock_is_instance_active, + mock_ask_for_url, + mock_ask_user_for_approval, + mock_is_interactive, + ): + """ + Test that if the user declines using the local instance, they are prompted for a URL. + """ + + def set_url(): + configurator.url = "http://user-provided-url.com" + + mock_ask_for_url.side_effect = set_url + + configurator = OpikConfigurator(url=None, force=False) + configurator._configure_local() + + mock_ask_user_for_approval.assert_called_once_with( + f"Found local Opik instance on: {OPIK_BASE_URL_LOCAL}, do you want to use it? (Y/n)" + ) + mock_ask_for_url.assert_called_once() + mock_update_config.assert_called_once() + + assert configurator.api_key is None + assert configurator.url == "http://user-provided-url.com" + assert configurator.workspace == OPIK_WORKSPACE_DEFAULT_NAME + + @patch("opik.configurator.configure.is_interactive", return_value=False) + @patch("opik.configurator.configure.ask_user_for_approval", return_value=False) + @patch("opik.configurator.configure.OpikConfigurator._ask_for_url") + @patch( + "opik.configurator.configure.OpikConfigurator._is_instance_active", + return_value=True, + ) + @patch("opik.configurator.configure.OpikConfigurator._update_config") + def test_configure_local_user_declines_local_instance__non_interactive( + self, + mock_update_config, + mock_is_instance_active, + mock_ask_for_url, + mock_ask_user_for_approval, + mock_is_interactive, + ): + """ + Test that if the user declines using the local instance, they are prompted for a URL. + """ + + def set_url(): + configurator.url = "http://user-provided-url.com" + + mock_ask_for_url.side_effect = set_url + + configurator = OpikConfigurator(url=None, force=False) + + with pytest.raises(ConfigurationError): + configurator._configure_local() + + mock_ask_for_url.assert_not_called() + mock_ask_user_for_approval.assert_not_called() + mock_update_config.assert_not_called() diff --git a/sdks/python/tests/unit/configurator/test_interactive_helpers.py b/sdks/python/tests/unit/configurator/test_interactive_helpers.py new file mode 100644 index 0000000000..915c5d9305 --- /dev/null +++ b/sdks/python/tests/unit/configurator/test_interactive_helpers.py @@ -0,0 +1,116 @@ +import sys +from unittest.mock import patch + +from opik.configurator.interactive_helpers import ask_user_for_approval, is_interactive + + +class TestIsInteractive: + @patch( + "opik.configurator.interactive_helpers._in_colab_environment", + return_value=False, + ) + @patch( + "opik.configurator.interactive_helpers._in_ipython_environment", + return_value=False, + ) + @patch( + "opik.configurator.interactive_helpers._in_jupyter_environment", + return_value=False, + ) + @patch.object(sys.stdin, "isatty", return_value=True) + def test_is_interactive__true( + self, + isatty, + _in_jupyter_environment, + _in_ipython_environment, + _in_colab_environment, + ): + assert is_interactive() is True + + @patch( + "opik.configurator.interactive_helpers._in_colab_environment", + return_value=False, + ) + @patch( + "opik.configurator.interactive_helpers._in_ipython_environment", + return_value=False, + ) + @patch( + "opik.configurator.interactive_helpers._in_jupyter_environment", + return_value=False, + ) + @patch.object(sys.stdin, "isatty", return_value=False) + def test_is_interactive__false( + self, + isatty, + _in_jupyter_environment, + _in_ipython_environment, + _in_colab_environment, + ): + assert is_interactive() is False + + +class TestAskUserForApproval: + @patch("builtins.input", return_value="Y") + def test_user_approves_with_y(self, mock_input): + """ + Test that 'Y' returns True for approval. + """ + result = ask_user_for_approval("Do you approve?") + assert result is True + + @patch("builtins.input", return_value="YES") + def test_user_approves_with_yes(self, mock_input): + """ + Test that 'YES' returns True for approval. + """ + result = ask_user_for_approval("Do you approve?") + assert result is True + + @patch("builtins.input", return_value="") + def test_user_approves_with_empty_input(self, mock_input): + """ + Test that empty input returns True for approval. + """ + result = ask_user_for_approval("Do you approve?") + assert result is True + + @patch("builtins.input", return_value="N") + def test_user_disapproves_with_n(self, mock_input): + """ + Test that 'N' returns False for disapproval. + """ + result = ask_user_for_approval("Do you disapprove?") + assert result is False + + @patch("builtins.input", return_value="NO") + def test_user_disapproves_with_no(self, mock_input): + """ + Test that 'NO' returns False for disapproval. + """ + result = ask_user_for_approval("Do you disapprove?") + assert result is False + + @patch("builtins.input", side_effect=["INVALID", "Y"]) + @patch("opik.configurator.interactive_helpers.LOGGER.error") + def test_user_enters_invalid_choice_then_approves( + self, mock_logger_error, mock_input + ): + """ + Test that invalid input triggers error logging and prompts again until valid input is entered. + """ + result = ask_user_for_approval("Do you approve?") + assert result is True + mock_logger_error.assert_called_once_with("Wrong choice. Please try again.") + + @patch("builtins.input", side_effect=["INVALID", "NO"]) + @patch("opik.configurator.interactive_helpers.LOGGER.error") + def test_user_enters_invalid_choice_then_disapproves( + self, mock_logger_error, mock_input + ): + """ + Test that invalid input triggers error logging and prompts again until valid input is entered. + """ + result = ask_user_for_approval("Do you disapprove?") + assert result is False + mock_logger_error.assert_called_once_with("Wrong choice. Please try again.") diff --git a/sdks/python/tests/unit/test_opik_configure.py b/sdks/python/tests/unit/test_opik_configure.py deleted file mode 100644 index 32bc14dd82..0000000000 --- a/sdks/python/tests/unit/test_opik_configure.py +++ /dev/null @@ -1,1089 +0,0 @@ -from pathlib import Path -from unittest.mock import MagicMock, Mock, patch - -import httpx -import pytest - -from opik.config import ( - OPIK_BASE_URL_CLOUD, - OPIK_BASE_URL_LOCAL, - OPIK_WORKSPACE_DEFAULT_NAME, - OpikConfig, -) -from opik.exceptions import ConfigurationError -from opik.opik_configure import ( - _ask_for_api_key, - _ask_for_url, - _ask_for_workspace, - _configure_cloud, - _configure_local, - _get_api_key, - _get_workspace, - _update_config, - ask_user_for_approval, - get_default_workspace, - is_api_key_correct, - is_instance_active, - is_workspace_name_correct, -) - - -@pytest.fixture(autouse=True) -def mock_env_and_file(monkeypatch): - monkeypatch.delenv("OPIK_API_KEY", raising=False) - monkeypatch.delenv("OPIK_WORKSPACE", raising=False) - monkeypatch.delenv("OPIK_URL_OVERRIDE", raising=False) - - with patch("builtins.open", side_effect=FileNotFoundError): - yield - - -class TestIsInstanceActive: - @pytest.mark.parametrize( - "status_code, expected_result", - [ - (200, True), - (404, False), - (500, False), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_is_instance_active(self, mock_httpx_client, status_code, expected_result): - """ - Test various HTTP status code responses to check if the instance is active. - """ - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = status_code - - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - url = "http://example.com" - result = is_instance_active(url) - - assert result == expected_result - - @patch("opik.opik_configure.httpx.Client") - def test_is_instance_active_timeout(self, mock_httpx_client): - """ - Test that a connection timeout results in False being returned. - """ - mock_client_instance = MagicMock() - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.side_effect = httpx.ConnectTimeout("timeout") - - mock_httpx_client.return_value = mock_client_instance - - url = "http://example.com" - result = is_instance_active(url) - - assert result is False - - @patch("opik.opik_configure.httpx.Client") - def test_is_instance_active_general_exception(self, mock_httpx_client): - """ - Test that any general exception results in False being returned. - """ - mock_client_instance = MagicMock() - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.side_effect = Exception("Unexpected error") - - mock_httpx_client.return_value = mock_client_instance - - url = "http://example.com" - result = is_instance_active(url) - - assert result is False - - -class TestIsWorkspaceNameCorrect: - @pytest.mark.parametrize( - "api_key, workspace, workspace_names, expected_result", - [ - ("valid_api_key", "correct_workspace", ["correct_workspace"], True), - ("valid_api_key", "incorrect_workspace", ["other_workspace"], False), - ("valid_api_key", "empty_workspace", [], False), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_workspace_valid_api_key( - self, mock_httpx_client, api_key, workspace, workspace_names, expected_result - ): - """ - Test cases with valid API keys and workspace verification. - These tests simulate different workspace existence conditions. - """ - # Mock the HTTP response for valid API key cases - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = {"workspaceNames": workspace_names} - - # Mock the context manager behavior - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - result = is_workspace_name_correct(api_key, workspace) - assert result == expected_result - - @pytest.mark.parametrize( - "status_code, response_text", - [(500, "Internal Server Error"), (404, "Not Found"), (403, "Forbidden")], - ) - @patch("opik.opik_configure.httpx.Client") - def test_workspace_non_200_response( - self, mock_httpx_client, status_code, response_text - ): - """ - Test cases where the API responds with a non-200 status code. - These responses should raise a ConnectionError. - """ - # Mock the HTTP response for non-200 status code cases - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = status_code - mock_response.text = response_text - - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - api_key = "valid_api_key" - workspace = "any_workspace" - - with pytest.raises(ConnectionError): - is_workspace_name_correct(api_key, workspace) - - @pytest.mark.parametrize( - "exception", - [ - (httpx.RequestError("Timeout", request=MagicMock())), - (Exception("Unexpected error")), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_workspace_request_exceptions(self, mock_httpx_client, exception): - """ - Test cases where an exception is raised during the HTTP request. - These cases should raise a ConnectionError with the appropriate message. - """ - # Mock the HTTP request to raise an exception - mock_client_instance = MagicMock() - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.side_effect = exception - - mock_httpx_client.return_value = mock_client_instance - - api_key = "valid_api_key" - workspace = "any_workspace" - - # Check that the appropriate ConnectionError is raised - with pytest.raises(ConnectionError): - is_workspace_name_correct(api_key, workspace) - - -class TestIsApiKeyCorrect: - @pytest.mark.parametrize( - "status_code, expected_result", - [ - (200, True), - (401, False), - (403, False), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_is_api_key_correct(self, mock_httpx_client, status_code, expected_result): - """ - Test valid, invalid, and forbidden API key scenarios by simulating HTTP status codes. - """ - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = status_code - - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - api_key = "dummy_api_key" - result = is_api_key_correct(api_key) - - assert result == expected_result - - @pytest.mark.parametrize( - "status_code, response_text", - [(500, "Internal Server Error")], - ) - @patch("opik.opik_configure.httpx.Client") - def test_is_api_key_correct_non_200_response( - self, mock_httpx_client, status_code, response_text - ): - """ - Test that a non-200, 401, or 403 response raises a ConnectionError. - """ - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = status_code - mock_response.text = response_text - - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - api_key = "dummy_api_key" - - with pytest.raises(ConnectionError): - is_api_key_correct(api_key) - - @pytest.mark.parametrize( - "exception", - [ - (httpx.RequestError("Timeout", request=MagicMock())), - (Exception("Unexpected error")), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_is_api_key_correct_exceptions(self, mock_httpx_client, exception): - """ - Test that RequestError and general exceptions are properly raised as ConnectionError. - """ - mock_client_instance = MagicMock() - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.__exit__.return_value = False - mock_client_instance.get.side_effect = exception - - mock_httpx_client.return_value = mock_client_instance - - api_key = "dummy_api_key" - - with pytest.raises(ConnectionError): - is_api_key_correct(api_key) - - -class TestGetDefaultWorkspace: - @pytest.mark.parametrize( - "status_code, response_json, expected_result", - [ - (200, {"defaultWorkspaceName": "workspace1"}, "workspace1"), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_get_default_workspace_success( - self, mock_httpx_client, status_code, response_json, expected_result - ): - """ - Test successful retrieval of the default workspace name. - """ - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = status_code - mock_response.json.return_value = response_json - - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - api_key = "valid_api_key" - result = get_default_workspace(api_key) - assert result == expected_result - - @pytest.mark.parametrize( - "status_code, response_text", - [ - (500, "Internal Server Error"), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_get_default_workspace_non_200_status( - self, mock_httpx_client, status_code, response_text - ): - """ - Test that non-200 status codes raise a ConnectionError. - """ - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = status_code - mock_response.text = response_text - - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - api_key = "valid_api_key" - with pytest.raises(ConnectionError): - get_default_workspace(api_key) - - @pytest.mark.parametrize( - "response_json", - [ - {}, - {"otherKey": "value"}, - None, - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_get_default_workspace_missing_key(self, mock_httpx_client, response_json): - """ - Test that missing 'defaultWorkspaceName' in the response raises a ConnectionError. - """ - mock_client_instance = MagicMock() - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = response_json - - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.get.return_value = mock_response - mock_httpx_client.return_value = mock_client_instance - - api_key = "valid_api_key" - with pytest.raises(ConnectionError): - get_default_workspace(api_key) - - @pytest.mark.parametrize( - "exception", - [ - httpx.RequestError("Timeout", request=MagicMock()), - Exception("Unexpected error"), - ], - ) - @patch("opik.opik_configure.httpx.Client") - def test_get_default_workspace_exceptions(self, mock_httpx_client, exception): - """ - Test that network and unexpected exceptions are raised as ConnectionError. - """ - mock_client_instance = MagicMock() - mock_client_instance.__enter__.return_value = mock_client_instance - mock_client_instance.get.side_effect = exception - mock_httpx_client.return_value = mock_client_instance - - api_key = "valid_api_key" - - with pytest.raises(ConnectionError): - get_default_workspace(api_key) - - -class TestUpdateConfig: - @patch("opik.opik_configure.opik.config.OpikConfig") - @patch("opik.opik_configure.opik.config.update_session_config") - def test_update_config_success(self, mock_update_session_config, mock_opik_config): - """ - Test successful update of the config and session. - """ - mock_config_instance = MagicMock() - mock_opik_config.return_value = mock_config_instance - - api_key = "dummy_api_key" - url = "http://example.com" - workspace = "workspace1" - - _update_config(api_key, url, workspace) - - # Ensure config object is created and saved - mock_opik_config.assert_called_once_with( - api_key=api_key, - url_override=url, - workspace=workspace, - ) - mock_config_instance.save_to_file.assert_called_once() - - # Ensure session config is updated - mock_update_session_config.assert_any_call("api_key", api_key) - mock_update_session_config.assert_any_call("url_override", url) - mock_update_session_config.assert_any_call("workspace", workspace) - - @patch("opik.opik_configure.opik.config.OpikConfig") - @patch("opik.opik_configure.opik.config.update_session_config") - def test_update_config_raises_exception( - self, mock_update_session_config, mock_opik_config - ): - """ - Test that ConfigurationError is raised when an exception occurs. - """ - mock_opik_config.side_effect = Exception("Unexpected error") - - api_key = "dummy_api_key" - url = "http://example.com" - workspace = "workspace1" - - with pytest.raises(ConfigurationError, match="Failed to update configuration."): - _update_config(api_key, url, workspace) - - # Ensure save_to_file is not called due to the exception - mock_update_session_config.assert_not_called() - - @patch("opik.opik_configure.opik.config.OpikConfig") - @patch("opik.opik_configure.opik.config.update_session_config") - def test_update_config_session_update_failure( - self, mock_update_session_config, mock_opik_config - ): - """ - Test that ConfigurationError is raised if updating the session configuration fails. - """ - mock_config_instance = MagicMock() - mock_opik_config.return_value = mock_config_instance - mock_update_session_config.side_effect = Exception("Session update failed") - - api_key = "dummy_api_key" - url = "http://example.com" - workspace = "workspace1" - - with pytest.raises(ConfigurationError, match="Failed to update configuration."): - _update_config(api_key, url, workspace) - - # Ensure config object is created and saved - mock_opik_config.assert_called_once_with( - api_key=api_key, - url_override=url, - workspace=workspace, - ) - mock_config_instance.save_to_file.assert_called_once() - - -class TestAskForUrl: - @patch("builtins.input", side_effect=["http://valid-url.com"]) - @patch("opik.opik_configure.is_instance_active", return_value=True) - def test_ask_for_url_success(self, mock_is_instance_active, mock_input): - """ - Test successful input of a valid Opik URL. - """ - result = _ask_for_url() - assert result == "http://valid-url.com" - mock_is_instance_active.assert_called_once_with("http://valid-url.com") - - @patch("builtins.input", side_effect=["http://invalid-url.com"] * 3) - @patch("opik.opik_configure.is_instance_active", return_value=False) - def test_ask_for_url_all_retries_fail(self, mock_is_instance_active, mock_input): - """ - Test that after 3 failed attempts, a ConfigurationError is raised. - """ - with pytest.raises(ConfigurationError, match="Cannot use the URL provided"): - _ask_for_url() - - assert mock_is_instance_active.call_count == 3 - - @patch( - "builtins.input", side_effect=["http://invalid-url.com", "http://valid-url.com"] - ) - @patch("opik.opik_configure.is_instance_active", side_effect=[False, True]) - def test_ask_for_url_success_on_second_try( - self, mock_is_instance_active, mock_input - ): - """ - Test that the URL is successfully returned on the second attempt after the first failure. - """ - result = _ask_for_url() - assert result == "http://valid-url.com" - assert mock_is_instance_active.call_count == 2 - - @patch( - "builtins.input", - side_effect=[ - "http://invalid-url.com", - "http://invalid-url-2.com", - "http://valid-url.com", - ], - ) - @patch("opik.opik_configure.is_instance_active", side_effect=[False, False, True]) - def test_ask_for_url_success_on_third_try( - self, mock_is_instance_active, mock_input - ): - """ - Test that the URL is successfully returned on the third attempt after two failures. - """ - result = _ask_for_url() - assert result == "http://valid-url.com" - assert mock_is_instance_active.call_count == 3 - - @patch("builtins.input", side_effect=["http://invalid-url.com"] * 3) - @patch("opik.opik_configure.is_instance_active", return_value=False) - @patch("opik.opik_configure.LOGGER.error") - def test_ask_for_url_logging( - self, mock_logger_error, mock_is_instance_active, mock_input - ): - """ - Test that errors are logged when the URL is not accessible. - """ - with pytest.raises(ConfigurationError): - _ask_for_url() - - assert mock_logger_error.call_count == 3 - mock_logger_error.assert_called_with( - f"Opik is not accessible at http://invalid-url.com. Please try again, the URL should follow a format similar to {OPIK_BASE_URL_LOCAL}" - ) - - -class TestAskForApiKey: - @patch("opik.opik_configure.getpass.getpass", return_value="valid_api_key") - @patch("opik.opik_configure.is_api_key_correct", return_value=True) - def test_ask_for_api_key_success(self, mock_is_api_key_correct, mock_getpass): - """ - Test successful entry of a valid API key. - """ - result = _ask_for_api_key() - assert result == "valid_api_key" - mock_is_api_key_correct.assert_called_once_with("valid_api_key") - - @patch("opik.opik_configure.getpass.getpass", return_value="invalid_api_key") - @patch("opik.opik_configure.is_api_key_correct", return_value=False) - def test_ask_for_api_key_all_retries_fail( - self, mock_is_api_key_correct, mock_getpass - ): - """ - Test that after 3 invalid API key attempts, a ConfigurationError is raised. - """ - with pytest.raises(ConfigurationError, match="API key is incorrect."): - _ask_for_api_key() - - assert mock_is_api_key_correct.call_count == 3 - - @patch( - "opik.opik_configure.getpass.getpass", side_effect=["invalid_key", "valid_key"] - ) - @patch("opik.opik_configure.is_api_key_correct", side_effect=[False, True]) - def test_ask_for_api_key_success_on_second_try( - self, mock_is_api_key_correct, mock_getpass - ): - """ - Test that the correct API key is entered on the second attempt after the first one is invalid. - """ - result = _ask_for_api_key() - assert result == "valid_key" - assert mock_is_api_key_correct.call_count == 2 - - @patch( - "opik.opik_configure.getpass.getpass", - side_effect=["invalid_key1", "invalid_key2", "valid_key"], - ) - @patch("opik.opik_configure.is_api_key_correct", side_effect=[False, False, True]) - def test_ask_for_api_key_success_on_third_try( - self, mock_is_api_key_correct, mock_getpass - ): - """ - Test that the correct API key is entered on the third attempt after two invalid attempts. - """ - result = _ask_for_api_key() - assert result == "valid_key" - assert mock_is_api_key_correct.call_count == 3 - - -class TestAskForWorkspace: - @patch("builtins.input", return_value="valid_workspace") - @patch("opik.opik_configure.is_workspace_name_correct", return_value=True) - def test_ask_for_workspace_success( - self, mock_is_workspace_name_correct, mock_input - ): - """ - Test successful entry of a valid workspace name. - """ - api_key = "valid_api_key" - result = _ask_for_workspace(api_key) - assert result == "valid_workspace" - mock_is_workspace_name_correct.assert_called_once_with( - api_key, "valid_workspace" - ) - - @patch("builtins.input", return_value="invalid_workspace") - @patch("opik.opik_configure.is_workspace_name_correct", return_value=False) - def test_ask_for_workspace_all_retries_fail( - self, mock_is_workspace_name_correct, mock_input - ): - """ - Test that after 3 invalid workspace name attempts, a ConfigurationError is raised. - """ - api_key = "valid_api_key" - - with pytest.raises( - ConfigurationError, - match="User does not have access to the workspaces provided.", - ): - _ask_for_workspace(api_key) - - assert mock_is_workspace_name_correct.call_count == 3 - - @patch("builtins.input", side_effect=["invalid_workspace", "valid_workspace"]) - @patch("opik.opik_configure.is_workspace_name_correct", side_effect=[False, True]) - def test_ask_for_workspace_success_on_second_try( - self, mock_is_workspace_name_correct, mock_input - ): - """ - Test that the workspace name is successfully entered on the second attempt after the first one is invalid. - """ - api_key = "valid_api_key" - result = _ask_for_workspace(api_key) - assert result == "valid_workspace" - assert mock_is_workspace_name_correct.call_count == 2 - - @patch( - "builtins.input", - side_effect=["invalid_workspace1", "invalid_workspace2", "valid_workspace"], - ) - @patch( - "opik.opik_configure.is_workspace_name_correct", - side_effect=[False, False, True], - ) - def test_ask_for_workspace_success_on_third_try( - self, mock_is_workspace_name_correct, mock_input - ): - """ - Test that the workspace name is successfully entered on the third attempt after two invalid attempts. - """ - api_key = "valid_api_key" - result = _ask_for_workspace(api_key) - assert result == "valid_workspace" - assert mock_is_workspace_name_correct.call_count == 3 - - -class TestAskUserForApproval: - @patch("builtins.input", return_value="Y") - def test_user_approves_with_y(self, mock_input): - """ - Test that 'Y' returns True for approval. - """ - result = ask_user_for_approval("Do you approve?") - assert result is True - - @patch("builtins.input", return_value="YES") - def test_user_approves_with_yes(self, mock_input): - """ - Test that 'YES' returns True for approval. - """ - result = ask_user_for_approval("Do you approve?") - assert result is True - - @patch("builtins.input", return_value="") - def test_user_approves_with_empty_input(self, mock_input): - """ - Test that empty input returns True for approval. - """ - result = ask_user_for_approval("Do you approve?") - assert result is True - - @patch("builtins.input", return_value="N") - def test_user_disapproves_with_n(self, mock_input): - """ - Test that 'N' returns False for disapproval. - """ - result = ask_user_for_approval("Do you disapprove?") - assert result is False - - @patch("builtins.input", return_value="NO") - def test_user_disapproves_with_no(self, mock_input): - """ - Test that 'NO' returns False for disapproval. - """ - result = ask_user_for_approval("Do you disapprove?") - assert result is False - - @patch("builtins.input", side_effect=["INVALID", "Y"]) - @patch("opik.opik_configure.LOGGER.error") - def test_user_enters_invalid_choice_then_approves( - self, mock_logger_error, mock_input - ): - """ - Test that invalid input triggers error logging and prompts again until valid input is entered. - """ - result = ask_user_for_approval("Do you approve?") - assert result is True - mock_logger_error.assert_called_once_with("Wrong choice. Please try again.") - - @patch("builtins.input", side_effect=["INVALID", "NO"]) - @patch("opik.opik_configure.LOGGER.error") - def test_user_enters_invalid_choice_then_disapproves( - self, mock_logger_error, mock_input - ): - """ - Test that invalid input triggers error logging and prompts again until valid input is entered. - """ - result = ask_user_for_approval("Do you disapprove?") - assert result is False - mock_logger_error.assert_called_once_with("Wrong choice. Please try again.") - - -class TestGetApiKey: - @patch("opik.opik_configure._ask_for_api_key", return_value="new_api_key") - def test_get_api_key_force_ask(self, mock_ask_for_api_key): - """ - Test that when force=True and no API key is provided, the user is asked for an API key. - """ - current_config = OpikConfig(api_key=None) - api_key, needs_update = _get_api_key( - api_key=None, current_config=current_config, force=True - ) - - assert api_key == "new_api_key" - assert needs_update is True - mock_ask_for_api_key.assert_called_once() - - @patch("opik.opik_configure._ask_for_api_key", return_value="new_api_key") - def test_get_api_key_ask_for_missing_key(self, mock_ask_for_api_key): - """ - Test that when no API key is provided and none is present in the config, the user is asked for an API key. - """ - current_config = OpikConfig(api_key=None) - api_key, needs_update = _get_api_key( - api_key=None, current_config=current_config, force=False - ) - - assert api_key == "new_api_key" - assert needs_update is True - mock_ask_for_api_key.assert_called_once() - - def test_get_api_key_use_config_key(self): - """ - Test that the API key is taken from the current config when provided and force=False. - """ - current_config = OpikConfig(api_key="config_api_key") - api_key, needs_update = _get_api_key( - api_key=None, current_config=current_config, force=False - ) - - assert api_key == "config_api_key" - assert needs_update is False - - def test_get_api_key_provided_key(self): - """ - Test that the user-provided API key is used directly if it's passed in. - """ - current_config = OpikConfig(api_key="config_api_key") - api_key, needs_update = _get_api_key( - api_key="user_provided_api_key", current_config=current_config, force=False - ) - - assert api_key == "user_provided_api_key" - assert needs_update is False - - -class TestGetWorkspace: - @patch("opik.opik_configure.is_workspace_name_correct", return_value=True) - def test_get_workspace_user_provided_valid(self, mock_is_workspace_name_correct): - """ - Test that the workspace provided by the user is valid and used. - """ - current_config = OpikConfig(workspace="existing_workspace") - workspace, needs_update = _get_workspace( - workspace="new_workspace", - api_key="valid_api_key", - current_config=current_config, - force=False, - ) - - assert workspace == "new_workspace" - assert needs_update is True - mock_is_workspace_name_correct.assert_called_once_with( - "valid_api_key", "new_workspace" - ) - - @patch("opik.opik_configure.is_workspace_name_correct", return_value=False) - def test_get_workspace_user_provided_invalid(self, mock_is_workspace_name_correct): - """ - Test that a ConfigurationError is raised if the user-provided workspace is invalid. - """ - current_config = OpikConfig(workspace="existing_workspace") - - with pytest.raises(ConfigurationError): - _get_workspace( - workspace="invalid_workspace", - api_key="valid_api_key", - current_config=current_config, - force=False, - ) - - mock_is_workspace_name_correct.assert_called_once_with( - "valid_api_key", "invalid_workspace" - ) - - def test_get_workspace_use_config(self): - """ - Test that the workspace from the current config is used when no workspace is provided and not forced. - """ - current_config = OpikConfig(workspace="configured_workspace") - workspace, needs_update = _get_workspace( - workspace=None, - api_key="valid_api_key", - current_config=current_config, - force=False, - ) - - assert workspace == "configured_workspace" - assert needs_update is False - - @patch( - "opik.opik_configure.get_default_workspace", return_value="default_workspace" - ) - @patch("opik.opik_configure.ask_user_for_approval", return_value=True) - def test_get_workspace_accept_default( - self, mock_ask_user_for_approval, mock_get_default_workspace - ): - """ - Test that the user accepts the default workspace. - """ - current_config = OpikConfig(workspace=OPIK_WORKSPACE_DEFAULT_NAME) - workspace, needs_update = _get_workspace( - workspace=None, - api_key="valid_api_key", - current_config=current_config, - force=False, - ) - - assert workspace == "default_workspace" - assert needs_update is True - mock_get_default_workspace.assert_called_once_with("valid_api_key") - mock_ask_user_for_approval.assert_called_once_with( - 'Do you want to use "default_workspace" workspace? (Y/n)' - ) - - @patch( - "opik.opik_configure.get_default_workspace", return_value="default_workspace" - ) - @patch("opik.opik_configure.ask_user_for_approval", return_value=False) - @patch("opik.opik_configure._ask_for_workspace", return_value="new_workspace") - def test_get_workspace_choose_different( - self, - mock_ask_for_workspace, - mock_ask_user_for_approval, - mock_get_default_workspace, - ): - """ - Test that the user declines the default workspace and chooses a new one. - """ - current_config = OpikConfig(workspace=OPIK_WORKSPACE_DEFAULT_NAME) - workspace, needs_update = _get_workspace( - workspace=None, - api_key="valid_api_key", - current_config=current_config, - force=False, - ) - - assert workspace == "new_workspace" - assert needs_update is True - mock_get_default_workspace.assert_called_once_with("valid_api_key") - mock_ask_user_for_approval.assert_called_once_with( - 'Do you want to use "default_workspace" workspace? (Y/n)' - ) - mock_ask_for_workspace.assert_called_once_with(api_key="valid_api_key") - - -class TestConfigureCloud: - @patch("opik.opik_configure._get_api_key", return_value=("valid_api_key", True)) - @patch("opik.opik_configure._get_workspace", return_value=("valid_workspace", True)) - @patch("opik.opik_configure._update_config") - def test_configure_cloud_with_update( - self, mock_update_config, mock_get_workspace, mock_get_api_key - ): - """ - Test that the configuration is updated when both API key and workspace require updates. - """ - _configure_cloud(api_key=None, workspace=None, force=False) - - mock_get_api_key.assert_called_once() - mock_get_workspace.assert_called_once() - mock_update_config.assert_called_once_with( - api_key="valid_api_key", - url=OPIK_BASE_URL_CLOUD, - workspace="valid_workspace", - ) - - @patch("opik.opik_configure._get_api_key", return_value=("valid_api_key", False)) - @patch( - "opik.opik_configure._get_workspace", return_value=("valid_workspace", False) - ) - @patch("opik.opik_configure.LOGGER.info") - @patch("opik.opik_configure.opik.config.OpikConfig") - @patch("opik.opik_configure._update_config") - def test_configure_cloud_no_update_needed( - self, - mock_update_config, - mock_opik_config, - mock_logger_info, - mock_get_workspace, - mock_get_api_key, - ): - """ - Test that no configuration update happens when both API key and workspace are already set. - """ - # Mock the config file path to return a specific path - mock_config_instance = MagicMock() - mock_config_instance.config_file_fullpath = Path("/some/path/.opik.config") - mock_opik_config.return_value = mock_config_instance - - # Call the function - _configure_cloud( - api_key="valid_api_key", workspace="valid_workspace", force=False - ) - - # Ensure API key and workspace were checked - mock_get_api_key.assert_called_once() - mock_get_workspace.assert_called_once() - - # Check config file wasn't overwritten - mock_update_config.assert_not_called() - - # Check the logging message - mock_logger_info.assert_called_with( - "Opik is already configured. You can check the settings by viewing the config file at %s", - Path("/some/path/.opik.config"), - ) - - @patch("opik.opik_configure._get_api_key", return_value=("new_api_key", True)) - @patch( - "opik.opik_configure._get_workspace", - return_value=("configured_workspace", False), - ) - @patch("opik.opik_configure._update_config") - def test_configure_cloud_api_key_updated( - self, mock_update_config, mock_get_workspace, mock_get_api_key - ): - """ - Test that the configuration is updated when only the API key changes. - """ - _configure_cloud(api_key=None, workspace="configured_workspace", force=False) - - mock_get_api_key.assert_called_once() - mock_get_workspace.assert_called_once() - mock_update_config.assert_called_once_with( - api_key="new_api_key", - url=OPIK_BASE_URL_CLOUD, - workspace="configured_workspace", - ) - - @patch("opik.opik_configure._get_api_key", return_value=("valid_api_key", False)) - @patch("opik.opik_configure._get_workspace", return_value=("new_workspace", True)) - @patch("opik.opik_configure._update_config") - def test_configure_cloud_workspace_updated( - self, mock_update_config, mock_get_workspace, mock_get_api_key - ): - """ - Test that the configuration is updated when only the workspace changes. - """ - _configure_cloud(api_key="valid_api_key", workspace=None, force=False) - - mock_get_api_key.assert_called_once() - mock_get_workspace.assert_called_once() - mock_update_config.assert_called_once_with( - api_key="valid_api_key", url=OPIK_BASE_URL_CLOUD, workspace="new_workspace" - ) - - -class TestConfigureLocal: - @patch( - "opik.opik_configure._ask_for_url", return_value="http://user-provided-url.com" - ) - @patch("opik.opik_configure.is_instance_active", return_value=False) - @patch("opik.opik_configure._update_config") - def test_configure_local_asks_for_url( - self, mock_update_config, mock_is_instance_active, mock_ask_for_url - ): - """ - Test that the function asks for a URL if no local instance is active and no URL is provided. - """ - _configure_local(url=None, force=False) - - mock_ask_for_url.assert_called_once() - mock_update_config.assert_called_once_with( - api_key=None, - url="http://user-provided-url.com", - workspace=OPIK_WORKSPACE_DEFAULT_NAME, - ) - - @patch("opik.opik_configure._ask_for_url") - @patch("opik.opik_configure.is_instance_active", return_value=True) - @patch("opik.opik_configure._update_config") - def test_configure_local_with_provided_url( - self, mock_update_config, mock_is_instance_active, mock_ask_for_url - ): - """ - Test that the function configures the provided URL if it is active. - """ - _configure_local(url="http://custom-local-instance.com", force=False) - - mock_ask_for_url.assert_not_called() - mock_is_instance_active.assert_called_once_with( - "http://custom-local-instance.com" - ) - mock_update_config.assert_called_once_with( - api_key=None, - url="http://custom-local-instance.com", - workspace=OPIK_WORKSPACE_DEFAULT_NAME, - ) - - @patch("opik.opik_configure._ask_for_url") - @patch("opik.opik_configure.is_instance_active", return_value=True) - @patch("opik.opik_configure.opik.config.OpikConfig") - @patch("opik.opik_configure.LOGGER.info") - def test_configure_local_no_update_needed( - self, - mock_logger_info, - mock_opik_config, - mock_is_instance_active, - mock_ask_for_url, - ): - """ - Test that no update happens if the local instance is already configured and force=False. - """ - mock_config_instance = MagicMock() - mock_config_instance.url_override = OPIK_BASE_URL_LOCAL - mock_opik_config.return_value = mock_config_instance - - _configure_local(url=None, force=False) - - mock_ask_for_url.assert_not_called() - mock_is_instance_active.assert_called_once_with(OPIK_BASE_URL_LOCAL) - mock_logger_info.assert_called_once_with( - f"Opik is already configured to local instance at {OPIK_BASE_URL_LOCAL}." - ) - - @patch("opik.opik_configure.ask_user_for_approval", return_value=True) - @patch("opik.opik_configure.is_instance_active", return_value=True) - @patch("opik.opik_configure._update_config") - def test_configure_local_uses_local_instance( - self, mock_update_config, mock_is_instance_active, mock_ask_user_for_approval - ): - """ - Test that the function configures the local instance when found and user approves. - """ - _configure_local(url=None, force=False) - - mock_ask_user_for_approval.assert_called_once_with( - f"Found local Opik instance on: {OPIK_BASE_URL_LOCAL}, do you want to use it? (Y/n)" - ) - mock_update_config.assert_called_once_with( - api_key=None, url=OPIK_BASE_URL_LOCAL, workspace=OPIK_WORKSPACE_DEFAULT_NAME - ) - - @patch("opik.opik_configure.ask_user_for_approval", return_value=False) - @patch( - "opik.opik_configure._ask_for_url", return_value="http://user-provided-url.com" - ) - @patch("opik.opik_configure.is_instance_active", return_value=True) - @patch("opik.opik_configure._update_config") - def test_configure_local_user_declines_local_instance( - self, - mock_update_config, - mock_is_instance_active, - mock_ask_for_url, - mock_ask_user_for_approval, - ): - """ - Test that if the user declines using the local instance, they are prompted for a URL. - """ - _configure_local(url=None, force=False) - - mock_ask_user_for_approval.assert_called_once_with( - f"Found local Opik instance on: {OPIK_BASE_URL_LOCAL}, do you want to use it? (Y/n)" - ) - mock_ask_for_url.assert_called_once() - mock_update_config.assert_called_once_with( - api_key=None, - url="http://user-provided-url.com", - workspace=OPIK_WORKSPACE_DEFAULT_NAME, - )