TasksΒΆ

The Task class allows you to define custom environments and rewards. Creating a new task is easy.

from dataclasses import dataclass
from judo.tasks import Task, TaskConfig

@dataclass
class MyTaskConfig(TaskConfig):
    my_param1: float = 1.0
    my_param2: int = 2

class MyTask(Task[MyTaskConfig]):
    def __init__(self, model_path: Path | str, sim_model_path: Path | str | None = None) -> None:
        super().__init__(model_path, sim_model_path=sim_model_path)
        # rest of __init__...

    def reward(
        self,
        states: np.ndarray,
        sensors: np.ndarray,
        controls: np.ndarray,
        config: MyTaskConfig,
        system_metadata: dict[str, Any] | None = None,
    ) -> np.ndarray:
        """Abstract reward function for task.

        Args:
            states: The rolled out states (after the initial condition).
                Shape=(num_rollouts, T, nq + nv).
            sensors: The rolled out sensors readings.
                Shape=(num_rollouts, T, total_num_sensor_dims).
            controls: The rolled out controls. Shape=(num_rollouts, T, nu).
            config: The current task config (passed in from the top-level controller).
            system_metadata: Any additional metadata from the system that is useful for
                computing the reward. For example, in the cube rotation task, the system
                could pass in new goal cube orientations to the controller here.

        Returns:
            rewards: The reward for each rollout. Shape=(num_rollouts,).
        """

If the system is our SimulationNode object, then there are two copies of the Task in the system and the controller respectively. The SimulationNode is responsible for stepping the mujoco simulation, while the Controller is responsible for rolling out the task. We expose functions for modifying the task before and after each of these steps. Additionally, we also allow a task-specific optimizer warm start, which is useful for tasks that require some initial setup before the optimization loop starts. The interface for these functions is as follows:

class MyTask(Task[MyTaskConfig]):
    def pre_rollout(self, curr_state: np.ndarray, config: MyTaskConfig) -> None:
        """Pre-rollout behavior for task (does nothing by default).

        Args:
            curr_state: Current state of the task. Shape=(nq + nv,).
        """

    def post_rollout(
        self,
        states: np.ndarray,
        sensors: np.ndarray,
        controls: np.ndarray,
        config: MyTaskConfig,
        system_metadata: dict[str, Any] | None = None,
    ) -> None:
        """Post-rollout behavior for task (does nothing by default).

        Same inputs as in reward function.
        """

    def pre_sim_step(self) -> None:
        """Pre-simulation step behavior for task."""

    def post_sim_step(self) -> None:
        """Post-simulation step behavior for task."""

    def optimizer_warm_start(self) -> np.ndarray:
        """Returns a warm start for the optimizer."""
        return np.zeros(self.nu)  # default is zeros

Lastly, we also provide the get_sim_metadata function, which allows the task instance in the SimulationNode to pass metadata to the task instance in the Controller.

class MyTask(Task[MyTaskConfig]):
    def get_sim_metadata(self) -> dict[str, Any]:
        """Get metadata from the simulation node to pass to the controller."""
        return {"my_metadata": self.my_value}