diff --git a/search/search_index.json b/search/search_index.json index a3a33ec..44c03d5 100644 --- a/search/search_index.json +++ b/search/search_index.json @@ -1 +1 @@ -{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Overview","text":"
ipybox
is a lightweight, stateful and secure Python code execution sandbox built with IPython and Docker. Designed for AI agents that interact with their environment through code execution, it is also well-suited for general-purpose code execution. Fully open-source and free to use, ipybox is distributed under the Apache 2.0 license.
This project is in early beta, with active development of new features ongoing.
"},{"location":"installation/","title":"Installation","text":"pip install ipybox\n
"},{"location":"installation/#docker-image","title":"Docker image","text":"Before you can use ipybox
, you need to build a Docker image. This image contains the required dependencies for executing Python code in stateful and isolated sessions.
Note
Building an ipybox
Docker image requires Docker to be installed on your system. Containers created from that image will run with the same user and group IDs as the user who built the image, ensuring proper file permissions on mounted host directories.
To build an ipybox
Docker image with default settings and no extra dependencies:
python -m ipybox build\n
This creates a Docker image tagged as gradion-ai/ipybox
with base Python dependencies required for the code execution environment.
To create a custom ipybox
Docker image with additional dependencies for your application, create a dependencies file (e.g., dependencies.txt
) following. For example:
pandas = \"^2.2\"\nscikit-learn = \"^1.5\"\nmatplotlib = \"^3.9\"\n
To build an image with custom tag and dependencies:
python -m ipybox build -t my-box:v1 -d path/to/dependencies.txt\n
The dependencies file should list Python packages in Poetry dependency specification format. These will be installed in addition to the base dependencies required for the execution environment. The execution container also supports installing dependencies at runtime.
"},{"location":"usage/","title":"Usage","text":"The two main classes of the ipybox
package are ExecutionContainer
and ExecutionClient
.
Note
Runnable scripts of the source code on this page are available in the examples directory.
"},{"location":"usage/#basic-usage","title":"Basic usage","text":"For executing code in ipybox
you first need to create a Docker container from an ipybox
Docker image and then an IPython kernel running in that container. This is done with the ExecutionContainer
and the ExecutionClient
context managers.
from ipybox import ExecutionClient, ExecutionContainer\n\n\nasync with ExecutionContainer(tag=\"gradion-ai/ipybox\") as container: # (1)!\n async with ExecutionClient(port=container.port) as client: # (2)!\n result = await client.execute(\"print('Hello, world!')\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
Hello, world!
The default image used by ExecutionContainer
is gradion-ai/ipybox
. You can specify a custom image with the tag
argument like in ExecutionContainer(tag=\"my-box:v1\")
, for example.
Note
Instead of letting the ExecutionContainer
context manager handle the lifecycle of the container, you can also manually run and kill the container.
Code execution within the same client
context is stateful i.e. you can reference variables from previous executions. Code executions in different client contexts are isolated from each other:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client_1: # (1)!\n result = await client_1.execute(\"x = 1\") # (2)!\n assert result.text is None\n result = await client_1.execute(\"print(x)\") # (3)!\n assert result.text == \"1\"\n\n async with ExecutionClient(port=container.port) as client_2: # (4)!\n try:\n await client_2.execute(\"print(x)\") # (5)!\n except ExecutionError as e:\n assert e.args[0] == \"NameError: name 'x' is not defined\"\n
client_2
contextThe ExecutionClient
supports streaming output as it's generated during code execution:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n code = \"\"\"\n import time\n for i in range(5):\n print(f\"Processing step {i}\")\n time.sleep(1)\n \"\"\" # (1)!\n\n execution = await client.submit(code) # (2)!\n print(\"Streaming output:\")\n async for chunk in execution.stream(): # (3)!\n print(f\"Received output: {chunk.strip()}\") # (4)!\n\n result = await execution.result() # (5)!\n print(\"\\nAggregated output:\")\n print(result.text) # (6)!\n
Received output: Processing step 0\nReceived output: Processing step 1\nReceived output: Processing step 2\nReceived output: Processing step 3\nReceived output: Processing step 4\n
Aggregated output:\nProcessing step 0\nProcessing step 1\nProcessing step 2\nProcessing step 3\nProcessing step 4\n
The stream()
method accepts an optional timeout
argument (defaults to 120
seconds). In case of timeout, the execution is automatically terminated by interrupting the kernel.
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"!pip install einops\") # (1)!\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await client.execute(\"\"\"\n import einops\n print(einops.__version__)\n \"\"\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
einops
package using pipCollecting einops\nDownloading einops-0.8.0-py3-none-any.whl (10.0 kB)\nInstalling collected packages: einops\nSuccessfully installed einops-0.8.0\n
Output: 0.8.0
You can also install and use a package within a single execution. There's no need to have two separate executions as done in the example above.
"},{"location":"usage/#creating-and-returning-plots","title":"Creating and returning plots","text":"Plots created with matplotlib
or other libraries are returned as PIL images. Images are not part of the output stream, but are available as images
list in the result
object.
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"\"\"\n !pip install matplotlib\n\n import matplotlib.pyplot as plt\n import numpy as np\n\n x = np.linspace(0, 10, 100)\n plt.figure(figsize=(8, 6))\n plt.plot(x, np.sin(x))\n plt.title('Sine Wave')\n plt.show()\n\n print(\"Plot generation complete!\")\n \"\"\") # (1)!\n\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await execution.result()\n result.images[0].save(\"sine.png\") # (3)!\n
matplotlib
and generate a plotprint
statement)Bind mounts allow executed code to read and write files on the host machine.
await aiofiles.os.makedirs(\"data\", exist_ok=True)\nawait aiofiles.os.makedirs(\"output\", exist_ok=True)\n\nbinds = { # (1)!\n \"./data\": \"data\", # (2)!\n \"./output\": \"output\", # (3)!\n}\n\nasync with aiofiles.open(\"data/input.txt\", \"w\") as f:\n await f.write(\"hello world\")\n\nasync with ExecutionContainer(binds=binds) as container:\n async with ExecutionClient(port=container.port) as client:\n await client.execute(\"\"\"\n with open('data/input.txt') as f:\n data = f.read()\n\n processed = data.upper()\n\n with open('output/result.txt', 'w') as f:\n f.write(processed)\n \"\"\") # (4)!\n\nasync with aiofiles.open(\"output/result.txt\", \"r\") as f: # (5)!\n result = await f.read()\n assert result == \"HELLO WORLD\"\n
data
directory, convert to uppercase and write to mounted output
directoryEnvironment variables can be set on the container for passing secrets or configuration data, for example.
# Define environment variables for the container\nenv = {\"API_KEY\": \"secret-key-123\", \"DEBUG\": \"1\"} # (1)!\n\nasync with ExecutionContainer(env=env) as container:\n async with ExecutionClient(port=container.port) as client:\n result = await client.execute(\"\"\"\n import os\n\n api_key = os.environ['API_KEY']\n print(f\"Using API key: {api_key}\")\n\n debug = bool(int(os.environ.get('DEBUG', '0')))\n if debug:\n print(\"Debug mode enabled\")\n \"\"\") # (2)!\n print(result.text) # (3)!\n
Using API key: secret-key-123\nDebug mode enabled\n
Instead of using ExecutionContainer
as a context manager, you can also manually run()
and kill()
the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777
in the example below).
# do some work ...\n
ExecutionContainer
instance using a fixed port.ExecutionClient(port: int, host: str = 'localhost', heartbeat_interval: float = 10)\n
A context manager for executing code in an IPython kernel.
Parameters:
Name Type Description Defaulthost
str
Hostname where the code execution container is running
'localhost'
port
int
Host port of the code execution container
requiredheartbeat_interval
float
Interval in seconds between heartbeat pings. Defaults to 10.
10
Example from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code inipybox/executor.py
def __init__(self, port: int, host: str = \"localhost\", heartbeat_interval: float = 10):\n self.port = port\n self.host = host\n\n self._heartbeat_interval = heartbeat_interval\n self._heartbeat_callback = None\n\n self._kernel_id = None\n self._ws: WebSocketClientConnection\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.kernel_id","title":"kernel_id property
","text":"kernel_id\n
The ID of the running IPython kernel.
Raises:
Type DescriptionValueError
If not connected to a kernel
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.connect","title":"connectasync
","text":"connect(retries: int = 10, retry_interval: float = 1.0)\n
Creates and connects to an IPython kernel.
Parameters:
Name Type Description Defaultretries
int
Number of connection attempts. Defaults to 10.
10
retry_interval
float
Delay between retries in seconds. Defaults to 1.0.
1.0
Raises:
Type DescriptionConnectionError
If connection cannot be established after all retries
Source code inipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):\n \"\"\"Creates and connects to an IPython kernel.\n\n Args:\n retries: Number of connection attempts. Defaults to 10.\n retry_interval: Delay between retries in seconds. Defaults to 1.0.\n\n Raises:\n ConnectionError: If connection cannot be established after all retries\n \"\"\"\n for _ in range(retries):\n try:\n self._kernel_id = await self._create_kernel()\n break\n except Exception:\n await asyncio.sleep(retry_interval)\n else:\n raise ConnectionError(\"Failed to create kernel\")\n\n self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))\n logger.info(\"Connected to kernel\")\n\n self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)\n self.heartbeat_callback.start()\n logger.info(f\"Started heartbeat (interval = {self._heartbeat_interval}s)\")\n\n await self._init_kernel()\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.disconnect","title":"disconnect async
","text":"disconnect()\n
Closes the connection to the kernel and cleans up resources.
Source code inipybox/executor.py
async def disconnect(self):\n \"\"\"Closes the connection to the kernel and cleans up resources.\"\"\"\n self.heartbeat_callback.stop()\n self._ws.close()\n async with aiohttp.ClientSession() as session:\n async with session.delete(self.kernel_http_url):\n pass\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.execute","title":"execute async
","text":"execute(code: str, timeout: float = 120) -> ExecutionResult\n
Executes code and returns the result.
Parameters:
Name Type Description Defaultcode
str
Code to execute
requiredtimeout
float
Maximum execution time in seconds. Defaults to 120.
120
Returns:
Type DescriptionExecutionResult
ExecutionResult object
Raises:
Type DescriptionExecutionError
If code execution raised an error
TimeoutError
If execution exceeds timeout duration
Source code inipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:\n \"\"\"Executes code and returns the result.\n\n Args:\n code: Code to execute\n timeout: Maximum execution time in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n ExecutionError: If code execution raised an error\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n execution = await self.submit(code)\n return await execution.result(timeout=timeout)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.submit","title":"submit async
","text":"submit(code: str) -> Execution\n
Submits code for execution and returns an Execution object to track it.
Parameters:
Name Type Description Defaultcode
str
Python code to execute
requiredReturns:
Type DescriptionExecution
An Execution object to track the submitted code execution
Source code inipybox/executor.py
async def submit(self, code: str) -> Execution:\n \"\"\"Submits code for execution and returns an Execution object to track it.\n\n Args:\n code: Python code to execute\n\n Returns:\n An Execution object to track the submitted code execution\n \"\"\"\n req_id = uuid4().hex\n req = {\n \"header\": {\n \"username\": \"\",\n \"version\": \"5.0\",\n \"session\": \"\",\n \"msg_id\": req_id,\n \"msg_type\": \"execute_request\",\n },\n \"parent_header\": {},\n \"channel\": \"shell\",\n \"content\": {\n \"code\": code,\n \"silent\": False,\n \"store_history\": False,\n \"user_expressions\": {},\n \"allow_stdin\": False,\n },\n \"metadata\": {},\n \"buffers\": {},\n }\n\n await self._send_request(req)\n return Execution(client=self, req_id=req_id)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionResult","title":"ExecutionResult dataclass
","text":"ExecutionResult(text: str | None, images: list[Image])\n
The result of a code execution.
Parameters:
Name Type Description Defaulttext
str | None
Output text generated during execution
requiredimages
list[Image]
List of images generated during execution
required"},{"location":"api/execution_client/#ipybox.executor.Execution","title":"Execution","text":"Execution(client: ExecutionClient, req_id: str)\n
Represents a code execution in an IPython kernel.
Parameters:
Name Type Description Defaultclient
ExecutionClient
The client instance that created this execution
requiredreq_id
str
Unique identifier for the execution request
required Source code inipybox/executor.py
def __init__(self, client: \"ExecutionClient\", req_id: str):\n self.client = client\n self.req_id = req_id\n\n self._chunks: list[str] = []\n self._images: list[Image.Image] = []\n\n self._stream_consumed: bool = False\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.result","title":"result async
","text":"result(timeout: float = 120) -> ExecutionResult\n
Waits for execution to complete and returns the final result.
If a timeout is reached, the kernel is interrupted.
Parameters:
Name Type Description Defaulttimeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Returns:
Type DescriptionExecutionResult
ExecutionResult object
Raises:
Type DescriptionTimeoutError
If execution exceeds timeout duration
Source code inipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:\n \"\"\"Waits for execution to complete and returns the final result.\n\n If a timeout is reached, the kernel is interrupted.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n if not self._stream_consumed:\n async for _ in self.stream(timeout=timeout):\n pass\n\n return ExecutionResult(\n text=\"\".join(self._chunks).strip() if self._chunks else None,\n images=self._images,\n )\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.stream","title":"stream async
","text":"stream(timeout: float = 120) -> AsyncIterator[str]\n
Streams the execution output text as it becomes available.
Parameters:
Name Type Description Defaulttimeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Yields:
Type DescriptionAsyncIterator[str]
Output text chunks as they arrive
Raises:
Type DescriptionTimeoutError
If execution exceeds timeout duration
Source code inipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:\n \"\"\"Streams the execution output text as it becomes available.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Yields:\n Output text chunks as they arrive\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n try:\n async with asyncio.timeout(timeout):\n async for elem in self._stream():\n match elem:\n case str():\n self._chunks.append(elem)\n yield elem\n case Image.Image():\n self._images.append(elem)\n except asyncio.TimeoutError:\n await self.client._interrupt_kernel()\n await asyncio.sleep(0.2) # TODO: make configurable\n raise\n finally:\n self._stream_consumed = True\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionError","title":"ExecutionError","text":"ExecutionError(message: str, trace: str | None = None)\n
Bases: Exception
Exception raised when code execution in the IPython kernel fails.
Parameters:
Name Type Description Defaultmessage
str
Error message
requiredtrace
str | None
Stack trace string representation
None
Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):\n super().__init__(message)\n self.trace = trace\n
"},{"location":"api/execution_client/#ipybox.executor.ConnectionError","title":"ConnectionError","text":" Bases: Exception
Exception raised when connection to an IPython kernel fails.
"},{"location":"api/execution_container/","title":"ExecutionContainer","text":"A context manager for managing the lifecycle of a Docker container used for code execution.
It handles the creation, port mapping, volume binding, and cleanup of the container.
Parameters:
Name Type Description Defaulttag
str
Tag of the Docker image to use (defaults to gradion-ai/ipybox)
DEFAULT_TAG
binds
dict[str, str] | None
Mapping of host paths to container paths for volume mounting. Host paths may be relative or absolute. Container paths must be relative and are created as subdirectories of /home/appuser
in the container.
None
env
dict[str, str] | None
Environment variables to set in the container
None
port
int | None
Host port to map to the container's executor port. If not provided, a random port will be allocated.
None
Attributes:
Name Type Descriptionport
int
Host port mapped to the container's executor port. This port is dynamically allocated when the container is started.
Examplefrom ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code inipybox/container.py
def __init__(\n self,\n tag: str = DEFAULT_TAG,\n binds: dict[str, str] | None = None,\n env: dict[str, str] | None = None,\n port: int | None = None,\n):\n self.tag = tag\n self.binds = binds or {}\n self.env = env or {}\n\n self._docker = None\n self._container = None\n self._port = port\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.port","title":"port property
","text":"port: int\n
The host port mapped to the container's executor port.
This port is dynamically allocated when the container is started unless explicitly provided.
Raises:
Type DescriptionRuntimeError
If the container is not running
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.kill","title":"killasync
","text":"kill()\n
Kill and remove the Docker container.
Source code inipybox/container.py
async def kill(self):\n \"\"\"\n Kill and remove the Docker container.\n \"\"\"\n if self._container:\n await self._container.kill()\n\n if self._docker:\n await self._docker.close()\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.run","title":"run async
","text":"run()\n
Create and start the Docker container.
Source code inipybox/container.py
async def run(self):\n \"\"\"\n Create and start the Docker container.\n \"\"\"\n self._docker = Docker()\n self._container = await self._run()\n
"}]}
\ No newline at end of file
+{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Overview","text":"ipybox
is a lightweight, stateful and secure Python code execution sandbox built with IPython and Docker. Designed for AI agents that interact with their environment through code execution, it is also well-suited for general-purpose code execution. Fully open-source and free to use, ipybox is distributed under the Apache 2.0 license.
This project is in early beta, with active development of new features ongoing.
"},{"location":"installation/","title":"Installation","text":"pip install ipybox\n
"},{"location":"installation/#docker-image","title":"Docker image","text":"Before you can use ipybox
, you need to build a Docker image. This image contains the required dependencies for executing Python code in stateful and isolated sessions.
Note
Building an ipybox
Docker image requires Docker to be installed on your system. Containers created from that image will run with the same user and group IDs as the user who built the image, ensuring proper file permissions on mounted host directories.
To build an ipybox
Docker image with default settings and no extra dependencies:
python -m ipybox build\n
This creates a Docker image tagged as gradion-ai/ipybox
with base Python dependencies required for the code execution environment.
To create a custom ipybox
Docker image with additional dependencies for your application, create a dependencies file (e.g., dependencies.txt
) following. For example:
pandas = \"^2.2\"\nscikit-learn = \"^1.5\"\nmatplotlib = \"^3.9\"\n
To build an image with custom tag and dependencies:
python -m ipybox build -t my-box:v1 -d path/to/dependencies.txt\n
The dependencies file should list Python packages in Poetry dependency specification format. These will be installed in addition to the base dependencies required for the execution environment. The execution container also supports installing dependencies at runtime.
"},{"location":"usage/","title":"Usage","text":"The two main classes of the ipybox
package are ExecutionContainer
and ExecutionClient
.
Note
Runnable scripts of the source code on this page are available in the examples directory.
"},{"location":"usage/#basic-usage","title":"Basic usage","text":"For executing code in ipybox
you first need to create a Docker container from an ipybox
Docker image and then an IPython kernel running in that container. This is done with the ExecutionContainer
and the ExecutionClient
context managers.
from ipybox import ExecutionClient, ExecutionContainer\n\n\nasync with ExecutionContainer(tag=\"gradion-ai/ipybox\") as container: # (1)!\n async with ExecutionClient(port=container.port) as client: # (2)!\n result = await client.execute(\"print('Hello, world!')\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
Hello, world!
The default image used by ExecutionContainer
is gradion-ai/ipybox
. You can specify a custom image with the tag
argument like in ExecutionContainer(tag=\"my-box:v1\")
, for example.
Note
Instead of letting the ExecutionContainer
context manager handle the lifecycle of the container, you can also manually run and kill the container.
Code execution within the same client
context is stateful i.e. you can reference variables from previous executions. Code executions in different client contexts are isolated from each other:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client_1: # (1)!\n result = await client_1.execute(\"x = 1\") # (2)!\n assert result.text is None\n result = await client_1.execute(\"print(x)\") # (3)!\n assert result.text == \"1\"\n\n async with ExecutionClient(port=container.port) as client_2: # (4)!\n try:\n await client_2.execute(\"print(x)\") # (5)!\n except ExecutionError as e:\n assert e.args[0] == \"NameError: name 'x' is not defined\"\n
client_2
contextThe ExecutionClient
supports streaming output as it's generated during code execution:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n code = \"\"\"\n import time\n for i in range(5):\n print(f\"Processing step {i}\")\n time.sleep(1)\n \"\"\" # (1)!\n\n execution = await client.submit(code) # (2)!\n print(\"Streaming output:\")\n async for chunk in execution.stream(): # (3)!\n print(f\"Received output: {chunk.strip()}\") # (4)!\n\n result = await execution.result() # (5)!\n print(\"\\nAggregated output:\")\n print(result.text) # (6)!\n
Received output: Processing step 0\nReceived output: Processing step 1\nReceived output: Processing step 2\nReceived output: Processing step 3\nReceived output: Processing step 4\n
Aggregated output:\nProcessing step 0\nProcessing step 1\nProcessing step 2\nProcessing step 3\nProcessing step 4\n
The stream()
method accepts an optional timeout
argument (defaults to 120
seconds). In case of timeout, the execution is automatically terminated by interrupting the kernel.
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"!pip install einops\") # (1)!\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await client.execute(\"\"\"\n import einops\n print(einops.__version__)\n \"\"\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
einops
package using pipCollecting einops\nDownloading einops-0.8.0-py3-none-any.whl (10.0 kB)\nInstalling collected packages: einops\nSuccessfully installed einops-0.8.0\n
Output: 0.8.0
You can also install and use a package within a single execution. There's no need to have two separate executions as done in the example above.
"},{"location":"usage/#creating-and-returning-plots","title":"Creating and returning plots","text":"Plots created with matplotlib
or other libraries are returned as PIL images. Images are not part of the output stream, but are available as images
list in the result
object.
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"\"\"\n !pip install matplotlib\n\n import matplotlib.pyplot as plt\n import numpy as np\n\n x = np.linspace(0, 10, 100)\n plt.figure(figsize=(8, 6))\n plt.plot(x, np.sin(x))\n plt.title('Sine Wave')\n plt.show()\n\n print(\"Plot generation complete!\")\n \"\"\") # (1)!\n\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await execution.result()\n result.images[0].save(\"sine.png\") # (3)!\n
matplotlib
and generate a plotprint
statement)Bind mounts allow executed code to read and write files on the host machine.
await aiofiles.os.makedirs(\"data\", exist_ok=True)\nawait aiofiles.os.makedirs(\"output\", exist_ok=True)\n\nbinds = { # (1)!\n \"./data\": \"data\", # (2)!\n \"./output\": \"output\", # (3)!\n}\n\nasync with aiofiles.open(\"data/input.txt\", \"w\") as f:\n await f.write(\"hello world\")\n\nasync with ExecutionContainer(binds=binds) as container:\n async with ExecutionClient(port=container.port) as client:\n await client.execute(\"\"\"\n with open('data/input.txt') as f:\n data = f.read()\n\n processed = data.upper()\n\n with open('output/result.txt', 'w') as f:\n f.write(processed)\n \"\"\") # (4)!\n\nasync with aiofiles.open(\"output/result.txt\", \"r\") as f: # (5)!\n result = await f.read()\n assert result == \"HELLO WORLD\"\n
data
directory, convert to uppercase and write to mounted output
directoryEnvironment variables can be set on the container for passing secrets or configuration data, for example.
# Define environment variables for the container\nenv = {\"API_KEY\": \"secret-key-123\", \"DEBUG\": \"1\"} # (1)!\n\nasync with ExecutionContainer(env=env) as container:\n async with ExecutionClient(port=container.port) as client:\n result = await client.execute(\"\"\"\n import os\n\n api_key = os.environ['API_KEY']\n print(f\"Using API key: {api_key}\")\n\n debug = bool(int(os.environ.get('DEBUG', '0')))\n if debug:\n print(\"Debug mode enabled\")\n \"\"\") # (2)!\n print(result.text) # (3)!\n
Using API key: secret-key-123\nDebug mode enabled\n
Instead of using ExecutionContainer
as a context manager, you can also manually run()
and kill()
the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777
in the example below).
container = ExecutionContainer(port=7777) # (1)!\nawait container.run() # (2)!\nassert container.port == 7777\n\n# do some work ...\n\nawait container.kill() # (3)!\n
ExecutionContainer
instance using a fixed port.ExecutionClient(port: int, host: str = 'localhost', heartbeat_interval: float = 10)\n
A context manager for executing code in an IPython kernel.
Parameters:
Name Type Description Defaulthost
str
Hostname where the code execution container is running
'localhost'
port
int
Host port of the code execution container
requiredheartbeat_interval
float
Interval in seconds between heartbeat pings. Defaults to 10.
10
Example from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code inipybox/executor.py
def __init__(self, port: int, host: str = \"localhost\", heartbeat_interval: float = 10):\n self.port = port\n self.host = host\n\n self._heartbeat_interval = heartbeat_interval\n self._heartbeat_callback = None\n\n self._kernel_id = None\n self._ws: WebSocketClientConnection\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.kernel_id","title":"kernel_id property
","text":"kernel_id\n
The ID of the running IPython kernel.
Raises:
Type DescriptionValueError
If not connected to a kernel
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.connect","title":"connectasync
","text":"connect(retries: int = 10, retry_interval: float = 1.0)\n
Creates and connects to an IPython kernel.
Parameters:
Name Type Description Defaultretries
int
Number of connection attempts. Defaults to 10.
10
retry_interval
float
Delay between retries in seconds. Defaults to 1.0.
1.0
Raises:
Type DescriptionConnectionError
If connection cannot be established after all retries
Source code inipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):\n \"\"\"Creates and connects to an IPython kernel.\n\n Args:\n retries: Number of connection attempts. Defaults to 10.\n retry_interval: Delay between retries in seconds. Defaults to 1.0.\n\n Raises:\n ConnectionError: If connection cannot be established after all retries\n \"\"\"\n for _ in range(retries):\n try:\n self._kernel_id = await self._create_kernel()\n break\n except Exception:\n await asyncio.sleep(retry_interval)\n else:\n raise ConnectionError(\"Failed to create kernel\")\n\n self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))\n logger.info(\"Connected to kernel\")\n\n self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)\n self.heartbeat_callback.start()\n logger.info(f\"Started heartbeat (interval = {self._heartbeat_interval}s)\")\n\n await self._init_kernel()\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.disconnect","title":"disconnect async
","text":"disconnect()\n
Closes the connection to the kernel and cleans up resources.
Source code inipybox/executor.py
async def disconnect(self):\n \"\"\"Closes the connection to the kernel and cleans up resources.\"\"\"\n self.heartbeat_callback.stop()\n self._ws.close()\n async with aiohttp.ClientSession() as session:\n async with session.delete(self.kernel_http_url):\n pass\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.execute","title":"execute async
","text":"execute(code: str, timeout: float = 120) -> ExecutionResult\n
Executes code and returns the result.
Parameters:
Name Type Description Defaultcode
str
Code to execute
requiredtimeout
float
Maximum execution time in seconds. Defaults to 120.
120
Returns:
Type DescriptionExecutionResult
ExecutionResult object
Raises:
Type DescriptionExecutionError
If code execution raised an error
TimeoutError
If execution exceeds timeout duration
Source code inipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:\n \"\"\"Executes code and returns the result.\n\n Args:\n code: Code to execute\n timeout: Maximum execution time in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n ExecutionError: If code execution raised an error\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n execution = await self.submit(code)\n return await execution.result(timeout=timeout)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.submit","title":"submit async
","text":"submit(code: str) -> Execution\n
Submits code for execution and returns an Execution object to track it.
Parameters:
Name Type Description Defaultcode
str
Python code to execute
requiredReturns:
Type DescriptionExecution
An Execution object to track the submitted code execution
Source code inipybox/executor.py
async def submit(self, code: str) -> Execution:\n \"\"\"Submits code for execution and returns an Execution object to track it.\n\n Args:\n code: Python code to execute\n\n Returns:\n An Execution object to track the submitted code execution\n \"\"\"\n req_id = uuid4().hex\n req = {\n \"header\": {\n \"username\": \"\",\n \"version\": \"5.0\",\n \"session\": \"\",\n \"msg_id\": req_id,\n \"msg_type\": \"execute_request\",\n },\n \"parent_header\": {},\n \"channel\": \"shell\",\n \"content\": {\n \"code\": code,\n \"silent\": False,\n \"store_history\": False,\n \"user_expressions\": {},\n \"allow_stdin\": False,\n },\n \"metadata\": {},\n \"buffers\": {},\n }\n\n await self._send_request(req)\n return Execution(client=self, req_id=req_id)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionResult","title":"ExecutionResult dataclass
","text":"ExecutionResult(text: str | None, images: list[Image])\n
The result of a code execution.
Parameters:
Name Type Description Defaulttext
str | None
Output text generated during execution
requiredimages
list[Image]
List of images generated during execution
required"},{"location":"api/execution_client/#ipybox.executor.Execution","title":"Execution","text":"Execution(client: ExecutionClient, req_id: str)\n
Represents a code execution in an IPython kernel.
Parameters:
Name Type Description Defaultclient
ExecutionClient
The client instance that created this execution
requiredreq_id
str
Unique identifier for the execution request
required Source code inipybox/executor.py
def __init__(self, client: \"ExecutionClient\", req_id: str):\n self.client = client\n self.req_id = req_id\n\n self._chunks: list[str] = []\n self._images: list[Image.Image] = []\n\n self._stream_consumed: bool = False\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.result","title":"result async
","text":"result(timeout: float = 120) -> ExecutionResult\n
Waits for execution to complete and returns the final result.
If a timeout is reached, the kernel is interrupted.
Parameters:
Name Type Description Defaulttimeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Returns:
Type DescriptionExecutionResult
ExecutionResult object
Raises:
Type DescriptionTimeoutError
If execution exceeds timeout duration
Source code inipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:\n \"\"\"Waits for execution to complete and returns the final result.\n\n If a timeout is reached, the kernel is interrupted.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n if not self._stream_consumed:\n async for _ in self.stream(timeout=timeout):\n pass\n\n return ExecutionResult(\n text=\"\".join(self._chunks).strip() if self._chunks else None,\n images=self._images,\n )\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.stream","title":"stream async
","text":"stream(timeout: float = 120) -> AsyncIterator[str]\n
Streams the execution output text as it becomes available.
Parameters:
Name Type Description Defaulttimeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Yields:
Type DescriptionAsyncIterator[str]
Output text chunks as they arrive
Raises:
Type DescriptionTimeoutError
If execution exceeds timeout duration
Source code inipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:\n \"\"\"Streams the execution output text as it becomes available.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Yields:\n Output text chunks as they arrive\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n try:\n async with asyncio.timeout(timeout):\n async for elem in self._stream():\n match elem:\n case str():\n self._chunks.append(elem)\n yield elem\n case Image.Image():\n self._images.append(elem)\n except asyncio.TimeoutError:\n await self.client._interrupt_kernel()\n await asyncio.sleep(0.2) # TODO: make configurable\n raise\n finally:\n self._stream_consumed = True\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionError","title":"ExecutionError","text":"ExecutionError(message: str, trace: str | None = None)\n
Bases: Exception
Exception raised when code execution in the IPython kernel fails.
Parameters:
Name Type Description Defaultmessage
str
Error message
requiredtrace
str | None
Stack trace string representation
None
Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):\n super().__init__(message)\n self.trace = trace\n
"},{"location":"api/execution_client/#ipybox.executor.ConnectionError","title":"ConnectionError","text":" Bases: Exception
Exception raised when connection to an IPython kernel fails.
"},{"location":"api/execution_container/","title":"ExecutionContainer","text":"A context manager for managing the lifecycle of a Docker container used for code execution.
It handles the creation, port mapping, volume binding, and cleanup of the container.
Parameters:
Name Type Description Defaulttag
str
Tag of the Docker image to use (defaults to gradion-ai/ipybox)
DEFAULT_TAG
binds
dict[str, str] | None
Mapping of host paths to container paths for volume mounting. Host paths may be relative or absolute. Container paths must be relative and are created as subdirectories of /home/appuser
in the container.
None
env
dict[str, str] | None
Environment variables to set in the container
None
port
int | None
Host port to map to the container's executor port. If not provided, a random port will be allocated.
None
Attributes:
Name Type Descriptionport
int
Host port mapped to the container's executor port. This port is dynamically allocated when the container is started.
Examplefrom ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code inipybox/container.py
def __init__(\n self,\n tag: str = DEFAULT_TAG,\n binds: dict[str, str] | None = None,\n env: dict[str, str] | None = None,\n port: int | None = None,\n):\n self.tag = tag\n self.binds = binds or {}\n self.env = env or {}\n\n self._docker = None\n self._container = None\n self._port = port\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.port","title":"port property
","text":"port: int\n
The host port mapped to the container's executor port.
This port is dynamically allocated when the container is started unless explicitly provided.
Raises:
Type DescriptionRuntimeError
If the container is not running
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.kill","title":"killasync
","text":"kill()\n
Kill and remove the Docker container.
Source code inipybox/container.py
async def kill(self):\n \"\"\"\n Kill and remove the Docker container.\n \"\"\"\n if self._container:\n await self._container.kill()\n\n if self._docker:\n await self._docker.close()\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.run","title":"run async
","text":"run()\n
Create and start the Docker container.
Source code inipybox/container.py
async def run(self):\n \"\"\"\n Create and start the Docker container.\n \"\"\"\n self._docker = Docker()\n self._container = await self._run()\n
"}]}
\ No newline at end of file
diff --git a/usage/index.html b/usage/index.html
index ec2b87d..da558ea 100644
--- a/usage/index.html
+++ b/usage/index.html
@@ -950,7 +950,13 @@ Instead of using ExecutionContainer
as a context manager, you can also manually run()
and kill()
the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777
in the example below).