From a6a7370282711d6d212249c71e4344439d18bdea Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Fri, 20 Dec 2024 08:06:19 +0100 Subject: [PATCH] Deployed 86fe79c with MkDocs version: 1.6.1 --- search/search_index.json | 2 +- usage/index.html | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/search/search_index.json b/search/search_index.json index a3a33ec..44c03d5 100644 --- a/search/search_index.json +++ b/search/search_index.json @@ -1 +1 @@ -{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Overview","text":"

ipybox is a lightweight, stateful and secure Python code execution sandbox built with IPython and Docker. Designed for AI agents that interact with their environment through code execution, it is also well-suited for general-purpose code execution. Fully open-source and free to use, ipybox is distributed under the Apache 2.0 license.

"},{"location":"#features","title":"Features","text":"

This project is in early beta, with active development of new features ongoing.

"},{"location":"installation/","title":"Installation","text":"
pip install ipybox\n
"},{"location":"installation/#docker-image","title":"Docker image","text":"

Before you can use ipybox, you need to build a Docker image. This image contains the required dependencies for executing Python code in stateful and isolated sessions.

Note

Building an ipybox Docker image requires Docker to be installed on your system. Containers created from that image will run with the same user and group IDs as the user who built the image, ensuring proper file permissions on mounted host directories.

"},{"location":"installation/#default-build","title":"Default build","text":"

To build an ipybox Docker image with default settings and no extra dependencies:

python -m ipybox build\n

This creates a Docker image tagged as gradion-ai/ipybox with base Python dependencies required for the code execution environment.

"},{"location":"installation/#custom-build","title":"Custom build","text":"

To create a custom ipybox Docker image with additional dependencies for your application, create a dependencies file (e.g., dependencies.txt) following. For example:

dependencies.txt
pandas = \"^2.2\"\nscikit-learn = \"^1.5\"\nmatplotlib = \"^3.9\"\n

To build an image with custom tag and dependencies:

python -m ipybox build -t my-box:v1 -d path/to/dependencies.txt\n

The dependencies file should list Python packages in Poetry dependency specification format. These will be installed in addition to the base dependencies required for the execution environment. The execution container also supports installing dependencies at runtime.

"},{"location":"usage/","title":"Usage","text":"

The two main classes of the ipybox package are ExecutionContainer and ExecutionClient.

Note

Runnable scripts of the source code on this page are available in the examples directory.

"},{"location":"usage/#basic-usage","title":"Basic usage","text":"

For executing code in ipybox you first need to create a Docker container from an ipybox Docker image and then an IPython kernel running in that container. This is done with the ExecutionContainer and the ExecutionClient context managers.

from ipybox import ExecutionClient, ExecutionContainer\n\n\nasync with ExecutionContainer(tag=\"gradion-ai/ipybox\") as container:  # (1)!\n    async with ExecutionClient(port=container.port) as client:  # (2)!\n        result = await client.execute(\"print('Hello, world!')\")  # (3)!\n        print(f\"Output: {result.text}\")  # (4)!\n
  1. Create and start a container for code execution
  2. Create and connect to an IPython kernel
  3. Execute Python code and await the result
  4. Output: Hello, world!

The default image used by ExecutionContainer is gradion-ai/ipybox. You can specify a custom image with the tag argument like in ExecutionContainer(tag=\"my-box:v1\"), for example.

Note

Instead of letting the ExecutionContainer context manager handle the lifecycle of the container, you can also manually run and kill the container.

"},{"location":"usage/#state-management","title":"State management","text":"

Code execution within the same client context is stateful i.e. you can reference variables from previous executions. Code executions in different client contexts are isolated from each other:

async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client_1:  # (1)!\n        result = await client_1.execute(\"x = 1\")  # (2)!\n        assert result.text is None\n        result = await client_1.execute(\"print(x)\")  # (3)!\n        assert result.text == \"1\"\n\n    async with ExecutionClient(port=container.port) as client_2:  # (4)!\n        try:\n            await client_2.execute(\"print(x)\")  # (5)!\n        except ExecutionError as e:\n            assert e.args[0] == \"NameError: name 'x' is not defined\"\n
  1. First client context
  2. Execute code that defines variable x
  3. Reference variable x defined in previous execution
  4. Second client context
  5. Variable x is not defined in client_2 context
"},{"location":"usage/#output-streaming","title":"Output streaming","text":"

The ExecutionClient supports streaming output as it's generated during code execution:

async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client:\n        code = \"\"\"\n        import time\n        for i in range(5):\n            print(f\"Processing step {i}\")\n            time.sleep(1)\n        \"\"\"  # (1)!\n\n        execution = await client.submit(code)  # (2)!\n        print(\"Streaming output:\")\n        async for chunk in execution.stream():  # (3)!\n            print(f\"Received output: {chunk.strip()}\")  # (4)!\n\n        result = await execution.result()  # (5)!\n        print(\"\\nAggregated output:\")\n        print(result.text)  # (6)!\n
  1. Code that produces gradual output
  2. Submit the code for execution
  3. Stream the output
  4. Prints one line per second:
    Received output: Processing step 0\nReceived output: Processing step 1\nReceived output: Processing step 2\nReceived output: Processing step 3\nReceived output: Processing step 4\n
  5. Get the aggregated output as a single result
  6. Prints the aggregated output:
    Aggregated output:\nProcessing step 0\nProcessing step 1\nProcessing step 2\nProcessing step 3\nProcessing step 4\n

The stream() method accepts an optional timeout argument (defaults to 120 seconds). In case of timeout, the execution is automatically terminated by interrupting the kernel.

"},{"location":"usage/#installing-dependencies-at-runtime","title":"Installing dependencies at runtime","text":"
async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client:\n        execution = await client.submit(\"!pip install einops\")  # (1)!\n        async for chunk in execution.stream():  # (2)!\n            print(chunk, end=\"\", flush=True)\n\n        result = await client.execute(\"\"\"\n            import einops\n            print(einops.__version__)\n        \"\"\")  # (3)!\n        print(f\"Output: {result.text}\")  # (4)!\n
  1. Install the einops package using pip
  2. Stream the installation progress. Something like
    Collecting einops\nDownloading einops-0.8.0-py3-none-any.whl (10.0 kB)\nInstalling collected packages: einops\nSuccessfully installed einops-0.8.0\n
  3. Import and use the installed package
  4. Prints Output: 0.8.0

You can also install and use a package within a single execution. There's no need to have two separate executions as done in the example above.

"},{"location":"usage/#creating-and-returning-plots","title":"Creating and returning plots","text":"

Plots created with matplotlib or other libraries are returned as PIL images. Images are not part of the output stream, but are available as images list in the result object.

async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client:\n        execution = await client.submit(\"\"\"\n            !pip install matplotlib\n\n            import matplotlib.pyplot as plt\n            import numpy as np\n\n            x = np.linspace(0, 10, 100)\n            plt.figure(figsize=(8, 6))\n            plt.plot(x, np.sin(x))\n            plt.title('Sine Wave')\n            plt.show()\n\n            print(\"Plot generation complete!\")\n            \"\"\")  # (1)!\n\n        async for chunk in execution.stream():  # (2)!\n            print(chunk, end=\"\", flush=True)\n\n        result = await execution.result()\n        result.images[0].save(\"sine.png\")  # (3)!\n
  1. Install matplotlib and generate a plot
  2. Stream output text (installation progress and print statement)
  3. Get attached image from execution result and save it as sine.png
"},{"location":"usage/#bind-mounts","title":"Bind mounts","text":"

Bind mounts allow executed code to read and write files on the host machine.

await aiofiles.os.makedirs(\"data\", exist_ok=True)\nawait aiofiles.os.makedirs(\"output\", exist_ok=True)\n\nbinds = {  # (1)!\n    \"./data\": \"data\",  # (2)!\n    \"./output\": \"output\",  # (3)!\n}\n\nasync with aiofiles.open(\"data/input.txt\", \"w\") as f:\n    await f.write(\"hello world\")\n\nasync with ExecutionContainer(binds=binds) as container:\n    async with ExecutionClient(port=container.port) as client:\n        await client.execute(\"\"\"\n            with open('data/input.txt') as f:\n                data = f.read()\n\n            processed = data.upper()\n\n            with open('output/result.txt', 'w') as f:\n                f.write(processed)\n        \"\"\")  # (4)!\n\nasync with aiofiles.open(\"output/result.txt\", \"r\") as f:  # (5)!\n    result = await f.read()\n    assert result == \"HELLO WORLD\"\n
  1. Map host paths to container paths.
  2. For reading files from host.
  3. For writing files to host.
  4. Read from mounted data directory, convert to uppercase and write to mounted output directory
  5. Verify the results on host
"},{"location":"usage/#environment-variables","title":"Environment variables","text":"

Environment variables can be set on the container for passing secrets or configuration data, for example.

# Define environment variables for the container\nenv = {\"API_KEY\": \"secret-key-123\", \"DEBUG\": \"1\"}  # (1)!\n\nasync with ExecutionContainer(env=env) as container:\n    async with ExecutionClient(port=container.port) as client:\n        result = await client.execute(\"\"\"\n            import os\n\n            api_key = os.environ['API_KEY']\n            print(f\"Using API key: {api_key}\")\n\n            debug = bool(int(os.environ.get('DEBUG', '0')))\n            if debug:\n                print(\"Debug mode enabled\")\n        \"\"\")  # (2)!\n        print(result.text)  # (3)!\n
  1. Define environment variables for the container
  2. Access environment variables in executed code
  3. Prints
    Using API key: secret-key-123\nDebug mode enabled\n
"},{"location":"usage/#manual-container-lifecycle-management","title":"Manual container lifecycle management","text":"

Instead of using ExecutionContainer as a context manager, you can also manually run() and kill() the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777 in the example below).

# do some work ...\n
  1. Create an ExecutionContainer instance using a fixed port.
  2. Run the container (detached).
  3. Cleanup.
"},{"location":"api/execution_client/","title":"ExecutionClient","text":""},{"location":"api/execution_client/#ipybox.executor.ExecutionClient","title":"ExecutionClient","text":"
ExecutionClient(port: int, host: str = 'localhost', heartbeat_interval: float = 10)\n

A context manager for executing code in an IPython kernel.

Parameters:

Name Type Description Default host str

Hostname where the code execution container is running

'localhost' port int

Host port of the code execution container

required heartbeat_interval float

Interval in seconds between heartbeat pings. Defaults to 10.

10 Example
from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n    async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n        result = await client.execute(\"print('Hello, world!')\")\n        print(result.text)\n

Hello, world!

Source code in ipybox/executor.py
def __init__(self, port: int, host: str = \"localhost\", heartbeat_interval: float = 10):\n    self.port = port\n    self.host = host\n\n    self._heartbeat_interval = heartbeat_interval\n    self._heartbeat_callback = None\n\n    self._kernel_id = None\n    self._ws: WebSocketClientConnection\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.kernel_id","title":"kernel_id property","text":"
kernel_id\n

The ID of the running IPython kernel.

Raises:

Type Description ValueError

If not connected to a kernel

"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.connect","title":"connect async","text":"
connect(retries: int = 10, retry_interval: float = 1.0)\n

Creates and connects to an IPython kernel.

Parameters:

Name Type Description Default retries int

Number of connection attempts. Defaults to 10.

10 retry_interval float

Delay between retries in seconds. Defaults to 1.0.

1.0

Raises:

Type Description ConnectionError

If connection cannot be established after all retries

Source code in ipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):\n    \"\"\"Creates and connects to an IPython kernel.\n\n    Args:\n        retries: Number of connection attempts. Defaults to 10.\n        retry_interval: Delay between retries in seconds. Defaults to 1.0.\n\n    Raises:\n        ConnectionError: If connection cannot be established after all retries\n    \"\"\"\n    for _ in range(retries):\n        try:\n            self._kernel_id = await self._create_kernel()\n            break\n        except Exception:\n            await asyncio.sleep(retry_interval)\n    else:\n        raise ConnectionError(\"Failed to create kernel\")\n\n    self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))\n    logger.info(\"Connected to kernel\")\n\n    self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)\n    self.heartbeat_callback.start()\n    logger.info(f\"Started heartbeat (interval = {self._heartbeat_interval}s)\")\n\n    await self._init_kernel()\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.disconnect","title":"disconnect async","text":"
disconnect()\n

Closes the connection to the kernel and cleans up resources.

Source code in ipybox/executor.py
async def disconnect(self):\n    \"\"\"Closes the connection to the kernel and cleans up resources.\"\"\"\n    self.heartbeat_callback.stop()\n    self._ws.close()\n    async with aiohttp.ClientSession() as session:\n        async with session.delete(self.kernel_http_url):\n            pass\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.execute","title":"execute async","text":"
execute(code: str, timeout: float = 120) -> ExecutionResult\n

Executes code and returns the result.

Parameters:

Name Type Description Default code str

Code to execute

required timeout float

Maximum execution time in seconds. Defaults to 120.

120

Returns:

Type Description ExecutionResult

ExecutionResult object

Raises:

Type Description ExecutionError

If code execution raised an error

TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:\n    \"\"\"Executes code and returns the result.\n\n    Args:\n        code: Code to execute\n        timeout: Maximum execution time in seconds. Defaults to 120.\n\n    Returns:\n        ExecutionResult object\n\n    Raises:\n        ExecutionError: If code execution raised an error\n        asyncio.TimeoutError: If execution exceeds timeout duration\n    \"\"\"\n    execution = await self.submit(code)\n    return await execution.result(timeout=timeout)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.submit","title":"submit async","text":"
submit(code: str) -> Execution\n

Submits code for execution and returns an Execution object to track it.

Parameters:

Name Type Description Default code str

Python code to execute

required

Returns:

Type Description Execution

An Execution object to track the submitted code execution

Source code in ipybox/executor.py
async def submit(self, code: str) -> Execution:\n    \"\"\"Submits code for execution and returns an Execution object to track it.\n\n    Args:\n        code: Python code to execute\n\n    Returns:\n        An Execution object to track the submitted code execution\n    \"\"\"\n    req_id = uuid4().hex\n    req = {\n        \"header\": {\n            \"username\": \"\",\n            \"version\": \"5.0\",\n            \"session\": \"\",\n            \"msg_id\": req_id,\n            \"msg_type\": \"execute_request\",\n        },\n        \"parent_header\": {},\n        \"channel\": \"shell\",\n        \"content\": {\n            \"code\": code,\n            \"silent\": False,\n            \"store_history\": False,\n            \"user_expressions\": {},\n            \"allow_stdin\": False,\n        },\n        \"metadata\": {},\n        \"buffers\": {},\n    }\n\n    await self._send_request(req)\n    return Execution(client=self, req_id=req_id)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionResult","title":"ExecutionResult dataclass","text":"
ExecutionResult(text: str | None, images: list[Image])\n

The result of a code execution.

Parameters:

Name Type Description Default text str | None

Output text generated during execution

required images list[Image]

List of images generated during execution

required"},{"location":"api/execution_client/#ipybox.executor.Execution","title":"Execution","text":"
Execution(client: ExecutionClient, req_id: str)\n

Represents a code execution in an IPython kernel.

Parameters:

Name Type Description Default client ExecutionClient

The client instance that created this execution

required req_id str

Unique identifier for the execution request

required Source code in ipybox/executor.py
def __init__(self, client: \"ExecutionClient\", req_id: str):\n    self.client = client\n    self.req_id = req_id\n\n    self._chunks: list[str] = []\n    self._images: list[Image.Image] = []\n\n    self._stream_consumed: bool = False\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.result","title":"result async","text":"
result(timeout: float = 120) -> ExecutionResult\n

Waits for execution to complete and returns the final result.

If a timeout is reached, the kernel is interrupted.

Parameters:

Name Type Description Default timeout float

Maximum time to wait in seconds. Defaults to 120.

120

Returns:

Type Description ExecutionResult

ExecutionResult object

Raises:

Type Description TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:\n    \"\"\"Waits for execution to complete and returns the final result.\n\n    If a timeout is reached, the kernel is interrupted.\n\n    Args:\n        timeout: Maximum time to wait in seconds. Defaults to 120.\n\n    Returns:\n        ExecutionResult object\n\n    Raises:\n        asyncio.TimeoutError: If execution exceeds timeout duration\n    \"\"\"\n    if not self._stream_consumed:\n        async for _ in self.stream(timeout=timeout):\n            pass\n\n    return ExecutionResult(\n        text=\"\".join(self._chunks).strip() if self._chunks else None,\n        images=self._images,\n    )\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.stream","title":"stream async","text":"
stream(timeout: float = 120) -> AsyncIterator[str]\n

Streams the execution output text as it becomes available.

Parameters:

Name Type Description Default timeout float

Maximum time to wait in seconds. Defaults to 120.

120

Yields:

Type Description AsyncIterator[str]

Output text chunks as they arrive

Raises:

Type Description TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:\n    \"\"\"Streams the execution output text as it becomes available.\n\n    Args:\n        timeout: Maximum time to wait in seconds. Defaults to 120.\n\n    Yields:\n        Output text chunks as they arrive\n\n    Raises:\n        asyncio.TimeoutError: If execution exceeds timeout duration\n    \"\"\"\n    try:\n        async with asyncio.timeout(timeout):\n            async for elem in self._stream():\n                match elem:\n                    case str():\n                        self._chunks.append(elem)\n                        yield elem\n                    case Image.Image():\n                        self._images.append(elem)\n    except asyncio.TimeoutError:\n        await self.client._interrupt_kernel()\n        await asyncio.sleep(0.2)  # TODO: make configurable\n        raise\n    finally:\n        self._stream_consumed = True\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionError","title":"ExecutionError","text":"
ExecutionError(message: str, trace: str | None = None)\n

Bases: Exception

Exception raised when code execution in the IPython kernel fails.

Parameters:

Name Type Description Default message str

Error message

required trace str | None

Stack trace string representation

None Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):\n    super().__init__(message)\n    self.trace = trace\n
"},{"location":"api/execution_client/#ipybox.executor.ConnectionError","title":"ConnectionError","text":"

Bases: Exception

Exception raised when connection to an IPython kernel fails.

"},{"location":"api/execution_container/","title":"ExecutionContainer","text":"

A context manager for managing the lifecycle of a Docker container used for code execution.

It handles the creation, port mapping, volume binding, and cleanup of the container.

Parameters:

Name Type Description Default tag str

Tag of the Docker image to use (defaults to gradion-ai/ipybox)

DEFAULT_TAG binds dict[str, str] | None

Mapping of host paths to container paths for volume mounting. Host paths may be relative or absolute. Container paths must be relative and are created as subdirectories of /home/appuser in the container.

None env dict[str, str] | None

Environment variables to set in the container

None port int | None

Host port to map to the container's executor port. If not provided, a random port will be allocated.

None

Attributes:

Name Type Description port int

Host port mapped to the container's executor port. This port is dynamically allocated when the container is started.

Example
from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n    async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n        result = await client.execute(\"print('Hello, world!')\")\n        print(result.text)\n

Hello, world!

Source code in ipybox/container.py
def __init__(\n    self,\n    tag: str = DEFAULT_TAG,\n    binds: dict[str, str] | None = None,\n    env: dict[str, str] | None = None,\n    port: int | None = None,\n):\n    self.tag = tag\n    self.binds = binds or {}\n    self.env = env or {}\n\n    self._docker = None\n    self._container = None\n    self._port = port\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.port","title":"port property","text":"
port: int\n

The host port mapped to the container's executor port.

This port is dynamically allocated when the container is started unless explicitly provided.

Raises:

Type Description RuntimeError

If the container is not running

"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.kill","title":"kill async","text":"
kill()\n

Kill and remove the Docker container.

Source code in ipybox/container.py
async def kill(self):\n    \"\"\"\n    Kill and remove the Docker container.\n    \"\"\"\n    if self._container:\n        await self._container.kill()\n\n    if self._docker:\n        await self._docker.close()\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.run","title":"run async","text":"
run()\n

Create and start the Docker container.

Source code in ipybox/container.py
async def run(self):\n    \"\"\"\n    Create and start the Docker container.\n    \"\"\"\n    self._docker = Docker()\n    self._container = await self._run()\n
"}]} \ No newline at end of file +{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Overview","text":"

ipybox is a lightweight, stateful and secure Python code execution sandbox built with IPython and Docker. Designed for AI agents that interact with their environment through code execution, it is also well-suited for general-purpose code execution. Fully open-source and free to use, ipybox is distributed under the Apache 2.0 license.

"},{"location":"#features","title":"Features","text":"

This project is in early beta, with active development of new features ongoing.

"},{"location":"installation/","title":"Installation","text":"
pip install ipybox\n
"},{"location":"installation/#docker-image","title":"Docker image","text":"

Before you can use ipybox, you need to build a Docker image. This image contains the required dependencies for executing Python code in stateful and isolated sessions.

Note

Building an ipybox Docker image requires Docker to be installed on your system. Containers created from that image will run with the same user and group IDs as the user who built the image, ensuring proper file permissions on mounted host directories.

"},{"location":"installation/#default-build","title":"Default build","text":"

To build an ipybox Docker image with default settings and no extra dependencies:

python -m ipybox build\n

This creates a Docker image tagged as gradion-ai/ipybox with base Python dependencies required for the code execution environment.

"},{"location":"installation/#custom-build","title":"Custom build","text":"

To create a custom ipybox Docker image with additional dependencies for your application, create a dependencies file (e.g., dependencies.txt) following. For example:

dependencies.txt
pandas = \"^2.2\"\nscikit-learn = \"^1.5\"\nmatplotlib = \"^3.9\"\n

To build an image with custom tag and dependencies:

python -m ipybox build -t my-box:v1 -d path/to/dependencies.txt\n

The dependencies file should list Python packages in Poetry dependency specification format. These will be installed in addition to the base dependencies required for the execution environment. The execution container also supports installing dependencies at runtime.

"},{"location":"usage/","title":"Usage","text":"

The two main classes of the ipybox package are ExecutionContainer and ExecutionClient.

Note

Runnable scripts of the source code on this page are available in the examples directory.

"},{"location":"usage/#basic-usage","title":"Basic usage","text":"

For executing code in ipybox you first need to create a Docker container from an ipybox Docker image and then an IPython kernel running in that container. This is done with the ExecutionContainer and the ExecutionClient context managers.

from ipybox import ExecutionClient, ExecutionContainer\n\n\nasync with ExecutionContainer(tag=\"gradion-ai/ipybox\") as container:  # (1)!\n    async with ExecutionClient(port=container.port) as client:  # (2)!\n        result = await client.execute(\"print('Hello, world!')\")  # (3)!\n        print(f\"Output: {result.text}\")  # (4)!\n
  1. Create and start a container for code execution
  2. Create and connect to an IPython kernel
  3. Execute Python code and await the result
  4. Output: Hello, world!

The default image used by ExecutionContainer is gradion-ai/ipybox. You can specify a custom image with the tag argument like in ExecutionContainer(tag=\"my-box:v1\"), for example.

Note

Instead of letting the ExecutionContainer context manager handle the lifecycle of the container, you can also manually run and kill the container.

"},{"location":"usage/#state-management","title":"State management","text":"

Code execution within the same client context is stateful i.e. you can reference variables from previous executions. Code executions in different client contexts are isolated from each other:

async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client_1:  # (1)!\n        result = await client_1.execute(\"x = 1\")  # (2)!\n        assert result.text is None\n        result = await client_1.execute(\"print(x)\")  # (3)!\n        assert result.text == \"1\"\n\n    async with ExecutionClient(port=container.port) as client_2:  # (4)!\n        try:\n            await client_2.execute(\"print(x)\")  # (5)!\n        except ExecutionError as e:\n            assert e.args[0] == \"NameError: name 'x' is not defined\"\n
  1. First client context
  2. Execute code that defines variable x
  3. Reference variable x defined in previous execution
  4. Second client context
  5. Variable x is not defined in client_2 context
"},{"location":"usage/#output-streaming","title":"Output streaming","text":"

The ExecutionClient supports streaming output as it's generated during code execution:

async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client:\n        code = \"\"\"\n        import time\n        for i in range(5):\n            print(f\"Processing step {i}\")\n            time.sleep(1)\n        \"\"\"  # (1)!\n\n        execution = await client.submit(code)  # (2)!\n        print(\"Streaming output:\")\n        async for chunk in execution.stream():  # (3)!\n            print(f\"Received output: {chunk.strip()}\")  # (4)!\n\n        result = await execution.result()  # (5)!\n        print(\"\\nAggregated output:\")\n        print(result.text)  # (6)!\n
  1. Code that produces gradual output
  2. Submit the code for execution
  3. Stream the output
  4. Prints one line per second:
    Received output: Processing step 0\nReceived output: Processing step 1\nReceived output: Processing step 2\nReceived output: Processing step 3\nReceived output: Processing step 4\n
  5. Get the aggregated output as a single result
  6. Prints the aggregated output:
    Aggregated output:\nProcessing step 0\nProcessing step 1\nProcessing step 2\nProcessing step 3\nProcessing step 4\n

The stream() method accepts an optional timeout argument (defaults to 120 seconds). In case of timeout, the execution is automatically terminated by interrupting the kernel.

"},{"location":"usage/#installing-dependencies-at-runtime","title":"Installing dependencies at runtime","text":"
async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client:\n        execution = await client.submit(\"!pip install einops\")  # (1)!\n        async for chunk in execution.stream():  # (2)!\n            print(chunk, end=\"\", flush=True)\n\n        result = await client.execute(\"\"\"\n            import einops\n            print(einops.__version__)\n        \"\"\")  # (3)!\n        print(f\"Output: {result.text}\")  # (4)!\n
  1. Install the einops package using pip
  2. Stream the installation progress. Something like
    Collecting einops\nDownloading einops-0.8.0-py3-none-any.whl (10.0 kB)\nInstalling collected packages: einops\nSuccessfully installed einops-0.8.0\n
  3. Import and use the installed package
  4. Prints Output: 0.8.0

You can also install and use a package within a single execution. There's no need to have two separate executions as done in the example above.

"},{"location":"usage/#creating-and-returning-plots","title":"Creating and returning plots","text":"

Plots created with matplotlib or other libraries are returned as PIL images. Images are not part of the output stream, but are available as images list in the result object.

async with ExecutionContainer() as container:\n    async with ExecutionClient(port=container.port) as client:\n        execution = await client.submit(\"\"\"\n            !pip install matplotlib\n\n            import matplotlib.pyplot as plt\n            import numpy as np\n\n            x = np.linspace(0, 10, 100)\n            plt.figure(figsize=(8, 6))\n            plt.plot(x, np.sin(x))\n            plt.title('Sine Wave')\n            plt.show()\n\n            print(\"Plot generation complete!\")\n            \"\"\")  # (1)!\n\n        async for chunk in execution.stream():  # (2)!\n            print(chunk, end=\"\", flush=True)\n\n        result = await execution.result()\n        result.images[0].save(\"sine.png\")  # (3)!\n
  1. Install matplotlib and generate a plot
  2. Stream output text (installation progress and print statement)
  3. Get attached image from execution result and save it as sine.png
"},{"location":"usage/#bind-mounts","title":"Bind mounts","text":"

Bind mounts allow executed code to read and write files on the host machine.

await aiofiles.os.makedirs(\"data\", exist_ok=True)\nawait aiofiles.os.makedirs(\"output\", exist_ok=True)\n\nbinds = {  # (1)!\n    \"./data\": \"data\",  # (2)!\n    \"./output\": \"output\",  # (3)!\n}\n\nasync with aiofiles.open(\"data/input.txt\", \"w\") as f:\n    await f.write(\"hello world\")\n\nasync with ExecutionContainer(binds=binds) as container:\n    async with ExecutionClient(port=container.port) as client:\n        await client.execute(\"\"\"\n            with open('data/input.txt') as f:\n                data = f.read()\n\n            processed = data.upper()\n\n            with open('output/result.txt', 'w') as f:\n                f.write(processed)\n        \"\"\")  # (4)!\n\nasync with aiofiles.open(\"output/result.txt\", \"r\") as f:  # (5)!\n    result = await f.read()\n    assert result == \"HELLO WORLD\"\n
  1. Map host paths to container paths.
  2. For reading files from host.
  3. For writing files to host.
  4. Read from mounted data directory, convert to uppercase and write to mounted output directory
  5. Verify the results on host
"},{"location":"usage/#environment-variables","title":"Environment variables","text":"

Environment variables can be set on the container for passing secrets or configuration data, for example.

# Define environment variables for the container\nenv = {\"API_KEY\": \"secret-key-123\", \"DEBUG\": \"1\"}  # (1)!\n\nasync with ExecutionContainer(env=env) as container:\n    async with ExecutionClient(port=container.port) as client:\n        result = await client.execute(\"\"\"\n            import os\n\n            api_key = os.environ['API_KEY']\n            print(f\"Using API key: {api_key}\")\n\n            debug = bool(int(os.environ.get('DEBUG', '0')))\n            if debug:\n                print(\"Debug mode enabled\")\n        \"\"\")  # (2)!\n        print(result.text)  # (3)!\n
  1. Define environment variables for the container
  2. Access environment variables in executed code
  3. Prints
    Using API key: secret-key-123\nDebug mode enabled\n
"},{"location":"usage/#manual-container-lifecycle-management","title":"Manual container lifecycle management","text":"

Instead of using ExecutionContainer as a context manager, you can also manually run() and kill() the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777 in the example below).

container = ExecutionContainer(port=7777)  # (1)!\nawait container.run()  # (2)!\nassert container.port == 7777\n\n# do some work ...\n\nawait container.kill()  # (3)!\n
  1. Create an ExecutionContainer instance using a fixed port.
  2. Run the container (detached).
  3. Cleanup.
"},{"location":"api/execution_client/","title":"ExecutionClient","text":""},{"location":"api/execution_client/#ipybox.executor.ExecutionClient","title":"ExecutionClient","text":"
ExecutionClient(port: int, host: str = 'localhost', heartbeat_interval: float = 10)\n

A context manager for executing code in an IPython kernel.

Parameters:

Name Type Description Default host str

Hostname where the code execution container is running

'localhost' port int

Host port of the code execution container

required heartbeat_interval float

Interval in seconds between heartbeat pings. Defaults to 10.

10 Example
from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n    async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n        result = await client.execute(\"print('Hello, world!')\")\n        print(result.text)\n

Hello, world!

Source code in ipybox/executor.py
def __init__(self, port: int, host: str = \"localhost\", heartbeat_interval: float = 10):\n    self.port = port\n    self.host = host\n\n    self._heartbeat_interval = heartbeat_interval\n    self._heartbeat_callback = None\n\n    self._kernel_id = None\n    self._ws: WebSocketClientConnection\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.kernel_id","title":"kernel_id property","text":"
kernel_id\n

The ID of the running IPython kernel.

Raises:

Type Description ValueError

If not connected to a kernel

"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.connect","title":"connect async","text":"
connect(retries: int = 10, retry_interval: float = 1.0)\n

Creates and connects to an IPython kernel.

Parameters:

Name Type Description Default retries int

Number of connection attempts. Defaults to 10.

10 retry_interval float

Delay between retries in seconds. Defaults to 1.0.

1.0

Raises:

Type Description ConnectionError

If connection cannot be established after all retries

Source code in ipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):\n    \"\"\"Creates and connects to an IPython kernel.\n\n    Args:\n        retries: Number of connection attempts. Defaults to 10.\n        retry_interval: Delay between retries in seconds. Defaults to 1.0.\n\n    Raises:\n        ConnectionError: If connection cannot be established after all retries\n    \"\"\"\n    for _ in range(retries):\n        try:\n            self._kernel_id = await self._create_kernel()\n            break\n        except Exception:\n            await asyncio.sleep(retry_interval)\n    else:\n        raise ConnectionError(\"Failed to create kernel\")\n\n    self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))\n    logger.info(\"Connected to kernel\")\n\n    self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)\n    self.heartbeat_callback.start()\n    logger.info(f\"Started heartbeat (interval = {self._heartbeat_interval}s)\")\n\n    await self._init_kernel()\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.disconnect","title":"disconnect async","text":"
disconnect()\n

Closes the connection to the kernel and cleans up resources.

Source code in ipybox/executor.py
async def disconnect(self):\n    \"\"\"Closes the connection to the kernel and cleans up resources.\"\"\"\n    self.heartbeat_callback.stop()\n    self._ws.close()\n    async with aiohttp.ClientSession() as session:\n        async with session.delete(self.kernel_http_url):\n            pass\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.execute","title":"execute async","text":"
execute(code: str, timeout: float = 120) -> ExecutionResult\n

Executes code and returns the result.

Parameters:

Name Type Description Default code str

Code to execute

required timeout float

Maximum execution time in seconds. Defaults to 120.

120

Returns:

Type Description ExecutionResult

ExecutionResult object

Raises:

Type Description ExecutionError

If code execution raised an error

TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:\n    \"\"\"Executes code and returns the result.\n\n    Args:\n        code: Code to execute\n        timeout: Maximum execution time in seconds. Defaults to 120.\n\n    Returns:\n        ExecutionResult object\n\n    Raises:\n        ExecutionError: If code execution raised an error\n        asyncio.TimeoutError: If execution exceeds timeout duration\n    \"\"\"\n    execution = await self.submit(code)\n    return await execution.result(timeout=timeout)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.submit","title":"submit async","text":"
submit(code: str) -> Execution\n

Submits code for execution and returns an Execution object to track it.

Parameters:

Name Type Description Default code str

Python code to execute

required

Returns:

Type Description Execution

An Execution object to track the submitted code execution

Source code in ipybox/executor.py
async def submit(self, code: str) -> Execution:\n    \"\"\"Submits code for execution and returns an Execution object to track it.\n\n    Args:\n        code: Python code to execute\n\n    Returns:\n        An Execution object to track the submitted code execution\n    \"\"\"\n    req_id = uuid4().hex\n    req = {\n        \"header\": {\n            \"username\": \"\",\n            \"version\": \"5.0\",\n            \"session\": \"\",\n            \"msg_id\": req_id,\n            \"msg_type\": \"execute_request\",\n        },\n        \"parent_header\": {},\n        \"channel\": \"shell\",\n        \"content\": {\n            \"code\": code,\n            \"silent\": False,\n            \"store_history\": False,\n            \"user_expressions\": {},\n            \"allow_stdin\": False,\n        },\n        \"metadata\": {},\n        \"buffers\": {},\n    }\n\n    await self._send_request(req)\n    return Execution(client=self, req_id=req_id)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionResult","title":"ExecutionResult dataclass","text":"
ExecutionResult(text: str | None, images: list[Image])\n

The result of a code execution.

Parameters:

Name Type Description Default text str | None

Output text generated during execution

required images list[Image]

List of images generated during execution

required"},{"location":"api/execution_client/#ipybox.executor.Execution","title":"Execution","text":"
Execution(client: ExecutionClient, req_id: str)\n

Represents a code execution in an IPython kernel.

Parameters:

Name Type Description Default client ExecutionClient

The client instance that created this execution

required req_id str

Unique identifier for the execution request

required Source code in ipybox/executor.py
def __init__(self, client: \"ExecutionClient\", req_id: str):\n    self.client = client\n    self.req_id = req_id\n\n    self._chunks: list[str] = []\n    self._images: list[Image.Image] = []\n\n    self._stream_consumed: bool = False\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.result","title":"result async","text":"
result(timeout: float = 120) -> ExecutionResult\n

Waits for execution to complete and returns the final result.

If a timeout is reached, the kernel is interrupted.

Parameters:

Name Type Description Default timeout float

Maximum time to wait in seconds. Defaults to 120.

120

Returns:

Type Description ExecutionResult

ExecutionResult object

Raises:

Type Description TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:\n    \"\"\"Waits for execution to complete and returns the final result.\n\n    If a timeout is reached, the kernel is interrupted.\n\n    Args:\n        timeout: Maximum time to wait in seconds. Defaults to 120.\n\n    Returns:\n        ExecutionResult object\n\n    Raises:\n        asyncio.TimeoutError: If execution exceeds timeout duration\n    \"\"\"\n    if not self._stream_consumed:\n        async for _ in self.stream(timeout=timeout):\n            pass\n\n    return ExecutionResult(\n        text=\"\".join(self._chunks).strip() if self._chunks else None,\n        images=self._images,\n    )\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.stream","title":"stream async","text":"
stream(timeout: float = 120) -> AsyncIterator[str]\n

Streams the execution output text as it becomes available.

Parameters:

Name Type Description Default timeout float

Maximum time to wait in seconds. Defaults to 120.

120

Yields:

Type Description AsyncIterator[str]

Output text chunks as they arrive

Raises:

Type Description TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:\n    \"\"\"Streams the execution output text as it becomes available.\n\n    Args:\n        timeout: Maximum time to wait in seconds. Defaults to 120.\n\n    Yields:\n        Output text chunks as they arrive\n\n    Raises:\n        asyncio.TimeoutError: If execution exceeds timeout duration\n    \"\"\"\n    try:\n        async with asyncio.timeout(timeout):\n            async for elem in self._stream():\n                match elem:\n                    case str():\n                        self._chunks.append(elem)\n                        yield elem\n                    case Image.Image():\n                        self._images.append(elem)\n    except asyncio.TimeoutError:\n        await self.client._interrupt_kernel()\n        await asyncio.sleep(0.2)  # TODO: make configurable\n        raise\n    finally:\n        self._stream_consumed = True\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionError","title":"ExecutionError","text":"
ExecutionError(message: str, trace: str | None = None)\n

Bases: Exception

Exception raised when code execution in the IPython kernel fails.

Parameters:

Name Type Description Default message str

Error message

required trace str | None

Stack trace string representation

None Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):\n    super().__init__(message)\n    self.trace = trace\n
"},{"location":"api/execution_client/#ipybox.executor.ConnectionError","title":"ConnectionError","text":"

Bases: Exception

Exception raised when connection to an IPython kernel fails.

"},{"location":"api/execution_container/","title":"ExecutionContainer","text":"

A context manager for managing the lifecycle of a Docker container used for code execution.

It handles the creation, port mapping, volume binding, and cleanup of the container.

Parameters:

Name Type Description Default tag str

Tag of the Docker image to use (defaults to gradion-ai/ipybox)

DEFAULT_TAG binds dict[str, str] | None

Mapping of host paths to container paths for volume mounting. Host paths may be relative or absolute. Container paths must be relative and are created as subdirectories of /home/appuser in the container.

None env dict[str, str] | None

Environment variables to set in the container

None port int | None

Host port to map to the container's executor port. If not provided, a random port will be allocated.

None

Attributes:

Name Type Description port int

Host port mapped to the container's executor port. This port is dynamically allocated when the container is started.

Example
from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n    async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n        result = await client.execute(\"print('Hello, world!')\")\n        print(result.text)\n

Hello, world!

Source code in ipybox/container.py
def __init__(\n    self,\n    tag: str = DEFAULT_TAG,\n    binds: dict[str, str] | None = None,\n    env: dict[str, str] | None = None,\n    port: int | None = None,\n):\n    self.tag = tag\n    self.binds = binds or {}\n    self.env = env or {}\n\n    self._docker = None\n    self._container = None\n    self._port = port\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.port","title":"port property","text":"
port: int\n

The host port mapped to the container's executor port.

This port is dynamically allocated when the container is started unless explicitly provided.

Raises:

Type Description RuntimeError

If the container is not running

"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.kill","title":"kill async","text":"
kill()\n

Kill and remove the Docker container.

Source code in ipybox/container.py
async def kill(self):\n    \"\"\"\n    Kill and remove the Docker container.\n    \"\"\"\n    if self._container:\n        await self._container.kill()\n\n    if self._docker:\n        await self._docker.close()\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.run","title":"run async","text":"
run()\n

Create and start the Docker container.

Source code in ipybox/container.py
async def run(self):\n    \"\"\"\n    Create and start the Docker container.\n    \"\"\"\n    self._docker = Docker()\n    self._container = await self._run()\n
"}]} \ No newline at end of file diff --git a/usage/index.html b/usage/index.html index ec2b87d..da558ea 100644 --- a/usage/index.html +++ b/usage/index.html @@ -950,7 +950,13 @@

Environment variables

Manual container lifecycle management

Instead of using ExecutionContainer as a context manager, you can also manually run() and kill() the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777 in the example below).

-
# do some work ...
+
container = ExecutionContainer(port=7777)  # (1)!
+await container.run()  # (2)!
+assert container.port == 7777
+
+# do some work ...
+
+await container.kill()  # (3)!
 
  1. Create an ExecutionContainer instance using a fixed port.