Skip to content

Commit

Permalink
Merge pull request #37 from kscalelabs/sim-protos
Browse files Browse the repository at this point in the history
Protobufs for kos-sim
  • Loading branch information
codekansas authored Jan 16, 2025
2 parents ce3b5ad + 899ffd5 commit 44dc678
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 11 deletions.
1 change: 1 addition & 0 deletions kos-py/pykos/Cargo.toml
5 changes: 2 additions & 3 deletions kos-py/pykos/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""KOS Python client."""

__version__ = "0.3.0"

from pykos.client import KOS
__version__ = "0.4.1"

from . import services
from .client import KOS
2 changes: 2 additions & 0 deletions kos-py/pykos/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pykos.services.actuator import ActuatorServiceClient
from pykos.services.imu import IMUServiceClient
from pykos.services.process_manager import ProcessManagerServiceClient
from pykos.services.sim import SimServiceClient


class KOS:
Expand All @@ -25,6 +26,7 @@ def __init__(self, ip: str = "localhost", port: int = 50051) -> None:
self.imu = IMUServiceClient(self.channel)
self.actuator = ActuatorServiceClient(self.channel)
self.process_manager = ProcessManagerServiceClient(self.channel)
self.sim = SimServiceClient(self.channel)

def close(self) -> None:
"""Close the gRPC channel."""
Expand Down
30 changes: 30 additions & 0 deletions kos-py/pykos/services/actuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ def get_calibration_status(self, actuator_id: int) -> str | None:
def command_actuators(self, commands: list[ActuatorCommand]) -> actuator_pb2.CommandActuatorsResponse:
"""Command multiple actuators at once.
Example:
>>> command_actuators([
... {"actuator_id": 1, "position": 90.0, "velocity": 100.0, "torque": 1.0},
... {"actuator_id": 2, "position": 180.0},
... ])
Args:
commands: List of dictionaries containing actuator commands.
Each dict should have 'actuator_id' and optionally 'position',
Expand All @@ -100,6 +106,27 @@ def command_actuators(self, commands: list[ActuatorCommand]) -> actuator_pb2.Com
def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> common_pb2.ActionResult:
"""Configure an actuator's parameters.
Example:
>>> configure_actuator(
... actuator_id=1,
... kp=1.0,
... kd=0.1,
... ki=0.01,
... max_torque=100.0,
... protective_torque=None,
... protection_time=None,
... torque_enabled=True,
... new_actuator_id=None,
... zero_position=True
... )
>>> configure_actuator(
... actuator_id=2,
... kp=1.0,
... kd=0.1,
... torque_enabled=True,
... )
Args:
actuator_id: ID of the actuator to configure
**kwargs: Configuration parameters that may include:
Expand All @@ -115,6 +142,9 @@ def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> comm
def get_actuators_state(self, actuator_ids: list[int] | None = None) -> actuator_pb2.GetActuatorsStateResponse:
"""Get the state of multiple actuators.
Example:
>>> get_actuators_state([1, 2])
Args:
actuator_ids: List of actuator IDs to query. If None, gets state of all actuators.
Expand Down
8 changes: 8 additions & 0 deletions kos-py/pykos/services/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ def get_quaternion(self) -> imu_pb2.QuaternionResponse:
def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> common_pb2.ActionResponse:
"""Zero the IMU.
Example:
>>> zero(duration=1.0,
... max_retries=3,
... max_angular_error=1.0,
... max_velocity=1.0,
... max_acceleration=1.0
... )
Args:
duration: Duration in seconds for zeroing operation
**kwargs: Additional zeroing parameters that may include:
Expand Down
121 changes: 121 additions & 0 deletions kos-py/pykos/services/sim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Sim service client."""

from typing import NotRequired, TypedDict, Unpack

import grpc
from google.protobuf.empty_pb2 import Empty

from kos_protos import common_pb2, sim_pb2, sim_pb2_grpc


class DefaultPosition(TypedDict):
qpos: list[float]


class ResetRequest(TypedDict):
initial_state: NotRequired[DefaultPosition]
randomize: NotRequired[bool]


class StepRequest(TypedDict):
num_steps: int
step_size: NotRequired[float]


class SimulationParameters(TypedDict):
time_scale: NotRequired[float]
gravity: NotRequired[float]
initial_state: NotRequired[DefaultPosition]


class SimServiceClient:
def __init__(self, channel: grpc.Channel) -> None:
self.stub = sim_pb2_grpc.SimulationServiceStub(channel)

def reset(self, **kwargs: Unpack[ResetRequest]) -> common_pb2.ActionResponse:
"""Reset the simulation to its initial state.
Args:
**kwargs: Reset parameters that may include:
initial_state: DefaultPosition to reset to
randomize: Whether to randomize the initial state
Example:
>>> client.reset(
... initial_state={"qpos": [0.0, 0.0, 0.0]},
... randomize=True
... )
Returns:
ActionResponse indicating success/failure
"""
initial_state = None
if "initial_state" in kwargs:
pos = kwargs["initial_state"]
initial_state = sim_pb2.DefaultPosition(qpos=pos["qpos"])

request = sim_pb2.ResetRequest(initial_state=initial_state, randomize=kwargs.get("randomize"))
return self.stub.Reset(request)

def set_paused(self, paused: bool) -> common_pb2.ActionResponse:
"""Pause or unpause the simulation.
Args:
paused: True to pause, False to unpause
Returns:
ActionResponse indicating success/failure
"""
request = sim_pb2.SetPausedRequest(paused=paused)
return self.stub.SetPaused(request)

def step(self, num_steps: int, step_size: float | None = None) -> common_pb2.ActionResponse:
"""Step the simulation forward.
Args:
num_steps: Number of simulation steps to take
step_size: Optional time per step in seconds
Returns:
ActionResponse indicating success/failure
"""
request = sim_pb2.StepRequest(num_steps=num_steps, step_size=step_size)
return self.stub.Step(request)

def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> common_pb2.ActionResponse:
"""Set simulation parameters.
Example:
>>> client.set_parameters(
... time_scale=1.0,
... gravity=9.81,
... initial_state={"qpos": [0.0, 0.0, 0.0]}
... )
Args:
**kwargs: Parameters that may include:
time_scale: Simulation time scale
gravity: Gravity constant
initial_state: Default position state
Returns:
ActionResponse indicating success/failure
"""
initial_state = None
if "initial_state" in kwargs:
pos = kwargs["initial_state"]
initial_state = sim_pb2.DefaultPosition(qpos=pos["qpos"])

params = sim_pb2.SimulationParameters(
time_scale=kwargs.get("time_scale"), gravity=kwargs.get("gravity"), initial_state=initial_state
)
request = sim_pb2.SetParametersRequest(parameters=params)
return self.stub.SetParameters(request)

def get_parameters(self) -> sim_pb2.GetParametersResponse:
"""Get current simulation parameters.
Returns:
GetParametersResponse containing current parameters and any error
"""
return self.stub.GetParameters(Empty())
1 change: 1 addition & 0 deletions kos-py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ module = [
"google.*",
"kos.*",
"kos_protos.*",
"version",
]

ignore_missing_imports = true
Expand Down
29 changes: 21 additions & 8 deletions kos-py/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import subprocess
from pathlib import Path
from typing import List

from setuptools import setup
Expand All @@ -13,13 +14,25 @@


class GenerateProtosMixin:
"""Mixin class to generate protos."""
"""Mixin class to generate protos and prepare build files."""

def generate_protos(self) -> None:
"""Generate proto files if Makefile exists."""
if os.path.exists("Makefile"):
subprocess.check_call(["make", "generate-proto"])

def copy_workspace_files(self) -> None:
"""Copy necessary workspace files for version handling."""
# Copy workspace Cargo.toml into the package directory
parent_cargo = Path(__file__).parent.parent / "Cargo.toml"
if parent_cargo.exists():
import shutil

target_dir = Path(__file__).parent / "pykos"
shutil.copy(parent_cargo, target_dir / "Cargo.toml")
else:
print("Warning: Could not find workspace Cargo.toml")


class BuildPyCommand(build_py, GenerateProtosMixin):
"""Custom build command to generate protos before building."""
Expand All @@ -43,6 +56,12 @@ def run(self) -> None:
long_description: str = f.read()


with open("pykos/__init__.py", "r", encoding="utf-8") as fh:
version_re = re.search(r"^__version__ = \"([^\"]*)\"", fh.read(), re.MULTILINE)
assert version_re is not None, "Could not find version in pykos/__init__.py"
version: str = version_re.group(1)


with open("pykos/requirements.txt", "r", encoding="utf-8") as f:
requirements: List[str] = f.read().splitlines()

Expand All @@ -51,12 +70,6 @@ def run(self) -> None:
requirements_dev: List[str] = f.read().splitlines()


with open("pykos/__init__.py", "r", encoding="utf-8") as fh:
version_re = re.search(r"^__version__ = \"([^\"]*)\"", fh.read(), re.MULTILINE)
assert version_re is not None, "Could not find version in pykos/__init__.py"
version: str = version_re.group(1)


setup(
name="pykos",
version=version,
Expand All @@ -70,7 +83,7 @@ def run(self) -> None:
extras_require={"dev": requirements_dev},
packages=["pykos", "pykos.services", "kos_protos"],
package_data={
"pykos": ["py.typed"],
"pykos": ["py.typed", "Cargo.toml"],
"kos_protos": ["py.typed"],
},
include_package_data=True,
Expand Down
1 change: 1 addition & 0 deletions kos/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fn main() {
"kos/common.proto",
"kos/actuator.proto",
"kos/imu.proto",
"kos/sim.proto",
"kos/inference.proto",
"kos/process_manager.proto",
"kos/system.proto",
Expand Down
73 changes: 73 additions & 0 deletions kos/proto/kos/sim.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
syntax = "proto3";

package kos.sim;

import "google/protobuf/empty.proto";
import "kos/common.proto";

option go_package = "kos/sim;sim";
option java_package = "com.kos.sim";
option csharp_namespace = "KOS.Sim";

// Service to control the simulation environment.
service SimulationService {
// Resets the simulation to its initial state.
rpc Reset(ResetRequest) returns (kos.common.ActionResponse);

// Pauses/unpauses the simulation.
rpc SetPaused(SetPausedRequest) returns (kos.common.ActionResponse);

// Steps the simulation forward by a specified amount.
rpc Step(StepRequest) returns (kos.common.ActionResponse);

// Sets various simulation parameters.
rpc SetParameters(SetParametersRequest) returns (kos.common.ActionResponse);

// Gets the current simulation parameters.
rpc GetParameters(google.protobuf.Empty) returns (GetParametersResponse);
}

// Default position for the simulation (initial state)
message DefaultPosition {
repeated float qpos = 1;
}

// Request to reset the simulation to initial state
message ResetRequest {
// If provided, reset to this specific state, otherwise use default
optional DefaultPosition initial_state = 1;
// If true, randomize the initial state within pre-set bounds
optional bool randomize = 2;
}

// Request to pause or resume the simulation
message SetPausedRequest {
bool paused = 1;
}

// Request to step the simulation forward
message StepRequest {
// Number of simulation steps to take
uint32 num_steps = 1;
// Time per step in seconds
optional float step_size = 2;
}

message SetParametersRequest {
SimulationParameters parameters = 1;
}

message GetParametersResponse {
SimulationParameters parameters = 1;
kos.common.Error error = 2; // Error details if any
}

// Controllable parameters for the simulation
message SimulationParameters {
// Time scale for the simulation
optional float time_scale = 1;
// Strength of gravity for the simulation
optional float gravity = 2;
// Initial state for the simulation
optional DefaultPosition initial_state = 3;
}
Loading

0 comments on commit 44dc678

Please sign in to comment.