planner.core.graph_worker#

Module Contents#

planner.core.graph_worker.pareto_distribution(length: int, exponent: float) torch.FloatTensor#
Parameters:
  • length (int) –

  • exponent (float) –

Return type:

torch.FloatTensor

class planner.core.graph_worker.GraphWorker(plant: jacta.planner.dynamics.simulator_plant.SimulatorPlant, graph: jacta.planner.core.graph.Graph, action_sampler: jacta.planner.core.action_sampler.ActionSampler, logger: jacta.planner.core.logger.Logger, params: jacta.planner.core.parameter_container.ParameterContainer, callback: Callable | None = None, callback_period: int | None = None)#
Parameters:
  • plant (jacta.planner.dynamics.simulator_plant.SimulatorPlant) –

  • graph (jacta.planner.core.graph.Graph) –

  • action_sampler (jacta.planner.core.action_sampler.ActionSampler) –

  • logger (jacta.planner.core.logger.Logger) –

  • params (jacta.planner.core.parameter_container.ParameterContainer) –

  • callback (Optional[Callable]) –

  • callback_period (Optional[int]) –

reset() None#
Return type:

None

node_selection(search_indices: torch.IntTensor) torch.IntTensor#

Selects a collection of nodes. Nodes a ranked either by reward or scaled distance to goal. Then nodes are selected according to the Pareto distribution.

Parameters:

search_indices (torch.IntTensor) – the indices of the searches to select nodes for

Return type:

torch.IntTensor

get_start_actions(node_ids: torch.IntTensor) torch.FloatTensor#
Parameters:

node_ids (torch.IntTensor) –

Return type:

torch.FloatTensor

get_end_actions(node_ids: torch.IntTensor, relative_actions: torch.FloatTensor, action_type: jacta.planner.core.types.ActionType | None) torch.FloatTensor#
Parameters:
  • node_ids (torch.IntTensor) –

  • relative_actions (torch.FloatTensor) –

  • action_type (Optional[jacta.planner.core.types.ActionType]) –

Return type:

torch.FloatTensor

node_extension(node_ids: torch.IntTensor, relative_actions: torch.FloatTensor, num_action_steps: int, action_type: jacta.planner.core.types.ActionType | None = None) Tuple[torch.IntTensor, float, bool]#

Chooses a node to extend to based on the current node and action sampler.

Parameters:
  • node_ids (torch.IntTensor) – the id sof the nodes to extend from with the actions

  • actions – control vectors of size (nu,)

  • num_action_steps (int) – the number of steps. Must be the same for all extensions to perform parallel rollout

  • relative_actions (torch.FloatTensor) –

  • action_type (Optional[jacta.planner.core.types.ActionType]) –

Return type:

Tuple[torch.IntTensor, float, bool]

node_pruning(paths_ids: torch.IntTensor) torch.IntTensor#

Finds the best node in path_ids and removes all nodes after the best node

Parameters:

paths_ids (torch.IntTensor) –

Return type:

torch.IntTensor

node_replacement(node_ids: torch.IntTensor, paths_ids: torch.IntTensor, best_indices: torch.IntTensor) Tuple[int, bool]#

Tries to replace the path from predecessor_node to node with a direct_node from predecessor_node

Parameters:
  • node_ids (torch.IntTensor) –

  • paths_ids (torch.IntTensor) –

  • best_indices (torch.IntTensor) –

Return type:

Tuple[int, bool]

percentage_range(start: int, stop: int) range#
Parameters:
  • start (int) –

  • stop (int) –

Return type:

range

get_progress_info(iteration: int, num_steps: int, print_percentage: bool = False, verbose: bool = False) torch.FloatTensor#
Parameters:
  • iteration (int) –

  • num_steps (int) –

  • print_percentage (bool) –

  • verbose (bool) –

Return type:

torch.FloatTensor

callback_and_progress_check(iteration: int, num_steps: int, change_goal: bool = False, verbose: bool = False) torch.BoolTensor#

Calls the search callback. Returns True if goal reached

Parameters:
  • iteration (int) –

  • num_steps (int) –

  • change_goal (bool) –

  • verbose (bool) –

Return type:

torch.BoolTensor

class planner.core.graph_worker.SingleGoalWorker(plant: jacta.planner.dynamics.simulator_plant.SimulatorPlant, graph: jacta.planner.core.graph.Graph, action_sampler: jacta.planner.core.action_sampler.ActionSampler, logger: jacta.planner.core.logger.Logger, params: jacta.planner.core.parameter_container.ParameterContainer, callback: Callable | None = None, callback_period: int | None = None)#

Bases: GraphWorker

Parameters:
  • plant (jacta.planner.dynamics.simulator_plant.SimulatorPlant) –

  • graph (jacta.planner.core.graph.Graph) –

  • action_sampler (jacta.planner.core.action_sampler.ActionSampler) –

  • logger (jacta.planner.core.logger.Logger) –

  • params (jacta.planner.core.parameter_container.ParameterContainer) –

  • callback (Optional[Callable]) –

  • callback_period (Optional[int]) –

work(verbose: bool = False) bool#

Tries to find a path to a single goal.

Parameters:

verbose (bool) –

Return type:

bool

class planner.core.graph_worker.ParallelGoalsWorker(*args: Tuple, **kwargs: dict)#

Bases: GraphWorker

Parameters:
  • args (Tuple) –

  • kwargs (dict) –

try_to_reallocate_workers(worker_reset_mask: torch.BoolTensor) None#
Parameters:

worker_reset_mask (torch.BoolTensor) –

Return type:

None

update_extension_lengths(search_reset_mask: torch.BoolTensor) None#
Parameters:

search_reset_mask (torch.BoolTensor) –

Return type:

None

reset_finished_workers() None#
Return type:

None

update_pareto_parameters(node_ids: torch.IntTensor, new_node_ids: torch.IntTensor) None#
Parameters:
  • node_ids (torch.IntTensor) –

  • new_node_ids (torch.IntTensor) –

Return type:

None

work(verbose: bool = False) bool#

Tries to find a path to a single goal.

Parameters:

verbose (bool) –

Return type:

bool

class planner.core.graph_worker.CommonGoalWorkerInterface(*args: Tuple, **kwargs: dict)#
Parameters:
  • args (Tuple) –

  • kwargs (dict) –

class planner.core.graph_worker.RelatedGoalWorker(*args: Tuple, **kwargs: dict)#

Bases: CommonGoalWorkerInterface

Parameters:
  • args (Tuple) –

  • kwargs (dict) –

work(verbose: bool = False) bool#

Tries to find paths to goals sampled around the actual goal.

Parameters:

verbose (bool) –

Return type:

bool

class planner.core.graph_worker.ExplorerWorker(*args: Tuple, **kwargs: dict)#

Bases: CommonGoalWorkerInterface

Parameters:
  • args (Tuple) –

  • kwargs (dict) –

work(verbose: bool = False) bool#

Tries to find paths to randomly sampled goals

Parameters:

verbose (bool) –

Return type:

bool

class planner.core.graph_worker.RolloutWorker(plant: jacta.planner.dynamics.simulator_plant.SimulatorPlant, graph: jacta.planner.core.graph.Graph, action_sampler: jacta.planner.core.action_sampler.ActionSampler, logger: jacta.planner.core.logger.Logger, params: jacta.planner.core.parameter_container.ParameterContainer, callback: Callable | None = None, callback_period: int | None = None)#

Bases: GraphWorker

Parameters:
  • plant (jacta.planner.dynamics.simulator_plant.SimulatorPlant) –

  • graph (jacta.planner.core.graph.Graph) –

  • action_sampler (jacta.planner.core.action_sampler.ActionSampler) –

  • logger (jacta.planner.core.logger.Logger) –

  • params (jacta.planner.core.parameter_container.ParameterContainer) –

  • callback (Optional[Callable]) –

  • callback_period (Optional[int]) –

work(verbose: bool = False) bool#

Always extends the last node.

Parameters:

verbose (bool) –

Return type:

bool

planner.core.graph_worker.inspect_action_type(graph_worker: GraphWorker, action_type: jacta.planner.core.types.ActionType, node_ids: torch.IntTensor | None = None, num_action_steps: int = 100) torch.FloatTensor#

Inspection tool for a specific action type. This rollout the dynamics of the system assuming that we always select the same action_type.

Parameters:
  • graph_worker (GraphWorker) –

  • action_type (jacta.planner.core.types.ActionType) –

  • node_ids (torch.IntTensor | None) –

  • num_action_steps (int) –

Return type:

torch.FloatTensor