Skip to content

Commit

Permalink
[FEAT] Add jobs to an open batch (#108)
Browse files Browse the repository at this point in the history
* [FEAT] Add jobs to a batch

* [PERF] Make test faster by mocking time.sleep

* [DOC] Update changelog and readme

* [FEAT] Wait for all jobs to be done instead of batch status
  • Loading branch information
MatthieuMoreau0 authored Feb 16, 2024
1 parent 89447a0 commit 8f89899
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 97 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

All notable changes to this project will be documented in this file.


## [0.5.0] - 2024-02-05

### Added

- Added feature to create an "open" batch.
- To create an open batch, set the `complete` argument to `True` in the `create_batch` method of the SDK
- To add jobs to an open batch, use the `add_jobs` method
- Updated documentation to add examples to create open batches.
- The `wait` argument now waits for all the jobs to be terminated instead of waiting for the batch to be terminated.


## [0.4.3] - 2024-12-08

### Added
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ See `Auth0TokenProvider` implementation for an example.
The package main component is a python object called `SDK` which can be used to create a `Batch`.

A `Batch` is a group of jobs with the same sequence that will run on the same QPU. For each job of a given batch, you must set a value for each variable, if any, defined in your sequence.
Once the QPU starts running a batch, only the jobs from that batch will be executed until they all end up in a termination status (`DONE`, `ERROR`, `CANCELED`).
The batch sequence can be generated using [Pulser](https://github.com/pasqal-io/Pulser). See their [documentation](https://pulser.readthedocs.io/en/stable/),
for more information on how to install the library and create your own sequence.

Expand All @@ -102,12 +103,33 @@ job1 = {"runs": 20, "variables": {"omega_max": 6}}
job2 = {"runs": 50, "variables": {"omega_max": 10.5}}
```

Then, send the batch of jobs to the QPU and wait for the results:
Batches can either be "open" or "closed" (also called "complete").
Open batch may be used to schedule variational algorithm where the next job parameter are derived from the results of the previous jobs, without losing access to the QPU.


You can create a batch of jobs using the `create_batch` method of the SDK.
By default, this will create a closed batch, so all jobs should be passed as arguments right away.
You may set the `wait` argument to `True` to wait for all the jobs to end up in a termination status before proceeding to the next statement.

```python
# Create a closed batch with 2 jobs and wait for its termination
batch = sdk.create_batch(serialized_sequence, [job1,job2], wait=True)
```

To create an open batch, set the `complete` argument to `False`, you can then add jobs to your batch.
Don't forget to mark your batch as closed/complete when you are done adding new jobs to it.

```python
# Create an open batch with 1 job
batch = sdk.create_batch(serialized_sequence, [job1], complete=False)
# Add some jobs to it and wait for the jobs to be terminated
job3 = {"runs": 50, "variables": {"omega_max": 10.5}}
batch.add_jobs([job2, job3], wait=True)
# When you have sent all the jobs to your batch, don't forget to mark it as complete
# Otherwise your batch will be timed out by the scheduler
batch.declare_complete()
```

You can also choose to run your batch on an emulator using the optional argument `emulator`.
For using a basic single-threaded QPU emulator that can go up to 10 qubits, you can specify the "EMU_FREE" emulator:

Expand Down
29 changes: 18 additions & 11 deletions pasqal_cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
WorkloadCreationError,
WorkloadFetchingError,
)
from pasqal_cloud.job import Job
from pasqal_cloud.job import CreateJob, Job
from pasqal_cloud.workload import Workload


Expand Down Expand Up @@ -114,25 +114,29 @@ def _get_batch(
def create_batch(
self,
serialized_sequence: str,
jobs: List[Dict[str, Any]],
jobs: List[CreateJob],
complete: bool = True,
emulator: Optional[EmulatorType] = None,
configuration: Optional[BaseConfig] = None,
wait: bool = False,
fetch_results: bool = False,
) -> Batch:
"""Create a new batch and send it to the API.
For Iroise MVP, the batch must contain at least one job and will be declared as
complete immediately.
Args:
serialized_sequence: Serialized pulser sequence.
jobs: List of jobs to be added to the batch at creation.
complete: True (default), if all jobs are sent at creation.
If set to False, jobs can be added using the `Batch.add_jobs` method.
Once all the jobs are sent, use the `Batch.declare_complete` method.
Otherwise, the batch will be timed out if all jobs have already
been terminated and no new jobs are sent.
emulator: The type of emulator to use,
If set to None, the device_type will be set to the one
stored in the serialized sequence
configuration: A dictionary with extra configuration for the emulators
that accept it.
wait: Whether to wait for the batch to be done and fetch results
wait: Whether to block on this statement until all the submitted jobs are terminated
fetch_results (deprecated): Whether to wait for the batch to
be done and fetch results
Expand All @@ -151,11 +155,13 @@ def create_batch(
DeprecationWarning,
stacklevel=2,
)
wait = wait or fetch_results

req = {
"sequence_builder": serialized_sequence,
"webhook": self.webhook,
"jobs": jobs,
"complete": complete,
}

# the emulator field is only added in the case
Expand All @@ -173,14 +179,15 @@ def create_batch(
except HTTPError as e:
raise BatchCreationError(e) from e

batch_id = batch_rsp["id"]
if wait or fetch_results:
while batch_rsp["status"] in ["PENDING", "RUNNING"]:
time.sleep(RESULT_POLLING_INTERVAL)
batch_rsp = self._get_batch(batch_id)

batch = Batch(**batch_rsp, _client=self._client)

if wait:
while any(
[job.status in {"PENDING", "RUNNING"} for job in batch.ordered_jobs]
):
time.sleep(RESULT_POLLING_INTERVAL)
batch.refresh()

self.batches[batch.id] = batch
return batch

Expand Down
2 changes: 1 addition & 1 deletion pasqal_cloud/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.4.3"
__version__ = "0.5.0"
88 changes: 42 additions & 46 deletions pasqal_cloud/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
JobCreationError,
JobFetchingError,
)
from pasqal_cloud.job import Job
from pasqal_cloud.job import CreateJob, Job

RESULT_POLLING_INTERVAL = 2 # seconds

Expand Down Expand Up @@ -120,45 +120,36 @@ def _load_configuration(
conf_class = EmuFreeConfig
return conf_class.from_dict(configuration)

def add_job(
def add_jobs(
self,
runs: int = 100,
variables: Optional[Dict[str, Any]] = None,
jobs: List[CreateJob],
wait: bool = False,
) -> Job:
"""Add and send a new job for this batch.
) -> None:
"""Add some jobs to batch for execution on PCS and returns the updated batch.
The batch should not be `complete` otherwise the API will return an error.
The new jobs are appended to the `ordered_jobs` list attribute.
Args:
runs: number of times the job is run on the QPU.
variables (optional): values for variables if sequence is parametrized.
wait: Whether to wait for the job to be done.
jobs: List of jobs to be added to the batch.
wait: If True, blocks until all jobs in the batch are done.
Returns:
- Job: the created job.
"""
job_data: Dict[str, Any] = {"runs": runs, "batch_id": self.id}
if variables:
job_data["variables"] = variables
try:
job_rsp = self._client._send_job(job_data)
batch_rsp = self._client._add_jobs(self.id, jobs)
except HTTPError as e:
raise JobCreationError(e) from e
job = Job(**job_rsp, _client=self._client)
self.ordered_jobs.append(job)
self._update_from_api_response(batch_rsp)

if wait:
while job.status in ["PENDING", "RUNNING"]:
while any(
[job.status in {"PENDING", "RUNNING"} for job in self.ordered_jobs]
):
time.sleep(RESULT_POLLING_INTERVAL)
try:
job_rsp = self._client._get_job(job.id)
except HTTPError as e:
raise JobFetchingError(e) from e
job = Job(**job_rsp)
return job

def declare_complete(
self, wait: bool = False, fetch_results: bool = False
) -> Dict[str, Any]:
"""Declare to PCS that the batch is complete.
self.refresh()

def declare_complete(self, wait: bool = False, fetch_results: bool = False) -> None:
"""Declare to PCS that the batch is complete and returns an updated batch instance.
Args:
wait: Whether to wait for the batch to be done and fetch results.
Expand All @@ -173,27 +164,32 @@ def declare_complete(
batch_rsp = self._client._complete_batch(self.id)
except HTTPError as e:
raise BatchSetCompleteError(e) from e
self.complete = True
self._update_from_api_response(batch_rsp)
if wait or fetch_results:
while batch_rsp["status"] in ["PENDING", "RUNNING"]:
while any(
[job.status in {"PENDING", "RUNNING"} for job in self.ordered_jobs]
):
time.sleep(RESULT_POLLING_INTERVAL)
try:
batch_rsp = self._client._get_batch(
self.id,
)
except HTTPError as e:
raise BatchFetchingError(e) from e
self.ordered_jobs = [
Job(**job, _client=self._client) for job in batch_rsp["jobs"]
]

return batch_rsp

def cancel(self) -> Dict[str, Any]:
self.refresh()

def cancel(self) -> None:
"""Cancel the current batch on the PCS."""
try:
batch_rsp = self._client._cancel_batch(self.id)
except HTTPError as e:
raise BatchCancellingError(e) from e
self.status = batch_rsp.get("status", "CANCELED")
return batch_rsp
self._update_from_api_response(batch_rsp)

def refresh(self) -> None:
"""Fetch the batch from the API and update it in place."""
try:
batch_rsp = self._client._get_batch(self.id)
except HTTPError as e:
raise BatchFetchingError(e) from e
self._update_from_api_response(batch_rsp)

def _update_from_api_response(self, data: Dict[str, Any]) -> None:
"""Update the instance in place with the response body of the batch API"""
updated_batch = Batch(**data, _client=self._client)
for field, value in updated_batch:
setattr(self, field, value)
13 changes: 9 additions & 4 deletions pasqal_cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

from getpass import getpass
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union

import requests
from requests.auth import AuthBase
Expand Down Expand Up @@ -97,7 +97,10 @@ def _credential_login(
return token_provider

def _request(
self, method: str, url: str, payload: Optional[Dict[str, Any]] = None
self,
method: str,
url: str,
payload: Optional[Union[Mapping, Sequence[Mapping]]] = None,
) -> JSendPayload:
rsp = requests.request(
method,
Expand Down Expand Up @@ -139,9 +142,11 @@ def _cancel_batch(self, batch_id: str) -> Dict[str, Any]:
)["data"]
return batch

def _send_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
def _add_jobs(
self, batch_id: str, jobs_data: Sequence[Mapping[str, Any]]
) -> Dict[str, Any]:
response: Dict[str, Any] = self._request(
"POST", f"{self.endpoints.core}/api/v1/jobs", job_data
"POST", f"{self.endpoints.core}/api/v1/batches/{batch_id}/jobs", jobs_data
)["data"]
return response

Expand Down
7 changes: 6 additions & 1 deletion pasqal_cloud/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, TypedDict, Union

from pydantic import BaseModel, Extra
from requests import HTTPError
Expand Down Expand Up @@ -61,3 +61,8 @@ def cancel(self) -> Dict[str, Any]:
raise JobCancellingError(e) from e
self.status = job_rsp.get("status", "CANCELED")
return job_rsp


class CreateJob(TypedDict, total=False):
runs: int
variables: Union[Dict[str, Any], None]
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,26 @@
"status": "PENDING",
"updated_at": "2021-11-10T15:27:44.110274",
"user_id": "EQZj1ZQE",
"webhook": "10.0.1.5"
"webhook": "10.0.1.5",
"jobs": [
{
"batch_id": "00000000-0000-0000-0000-000000000001",
"id": "00000000-0000-0000-0000-000000022010",
"project_id": "00000000-0000-0000-0000-000000022111",
"runs": 50,
"status": "RUNNING",
"created_at": "2021-11-10T15:27:06.698066",
"errors": [],
"result": null,
"full_result": null,
"updated_at": "2021-11-10T15:27:06.698066",
"variables": {
"Omega_max": 14.4,
"last_target": "q1",
"ts": [200, 500]
}
}
]
},
"message": "OK.",
"status": "success"
Expand Down
Loading

0 comments on commit 8f89899

Please sign in to comment.