planner.core.graph_worker
#
Module Contents#
- planner.core.graph_worker.pareto_distribution(length: int, exponent: float) torch.FloatTensor #
- Parameters:
length (int) –
exponent (float) –
- Return type:
torch.FloatTensor
- class planner.core.graph_worker.GraphWorker(plant: jacta.planner.dynamics.simulator_plant.SimulatorPlant, graph: jacta.planner.core.graph.Graph, action_sampler: jacta.planner.core.action_sampler.ActionSampler, logger: jacta.planner.core.logger.Logger, params: jacta.planner.core.parameter_container.ParameterContainer, callback: Callable | None = None, callback_period: int | None = None)#
- Parameters:
plant (jacta.planner.dynamics.simulator_plant.SimulatorPlant) –
graph (jacta.planner.core.graph.Graph) –
action_sampler (jacta.planner.core.action_sampler.ActionSampler) –
logger (jacta.planner.core.logger.Logger) –
params (jacta.planner.core.parameter_container.ParameterContainer) –
callback (Optional[Callable]) –
callback_period (Optional[int]) –
- reset() None #
- Return type:
None
- node_selection(search_indices: torch.IntTensor) torch.IntTensor #
Selects a collection of nodes. Nodes a ranked either by reward or scaled distance to goal. Then nodes are selected according to the Pareto distribution.
- Parameters:
search_indices (torch.IntTensor) – the indices of the searches to select nodes for
- Return type:
torch.IntTensor
- get_start_actions(node_ids: torch.IntTensor) torch.FloatTensor #
- Parameters:
node_ids (torch.IntTensor) –
- Return type:
torch.FloatTensor
- get_end_actions(node_ids: torch.IntTensor, relative_actions: torch.FloatTensor, action_type: jacta.planner.core.types.ActionType | None) torch.FloatTensor #
- Parameters:
node_ids (torch.IntTensor) –
relative_actions (torch.FloatTensor) –
action_type (Optional[jacta.planner.core.types.ActionType]) –
- Return type:
torch.FloatTensor
- node_extension(node_ids: torch.IntTensor, relative_actions: torch.FloatTensor, num_action_steps: int, action_type: jacta.planner.core.types.ActionType | None = None) Tuple[torch.IntTensor, float, bool] #
Chooses a node to extend to based on the current node and action sampler.
- Parameters:
node_ids (torch.IntTensor) – the id sof the nodes to extend from with the actions
actions – control vectors of size (nu,)
num_action_steps (int) – the number of steps. Must be the same for all extensions to perform parallel rollout
relative_actions (torch.FloatTensor) –
action_type (Optional[jacta.planner.core.types.ActionType]) –
- Return type:
Tuple[torch.IntTensor, float, bool]
- node_pruning(paths_ids: torch.IntTensor) torch.IntTensor #
Finds the best node in path_ids and removes all nodes after the best node
- Parameters:
paths_ids (torch.IntTensor) –
- Return type:
torch.IntTensor
- node_replacement(node_ids: torch.IntTensor, paths_ids: torch.IntTensor, best_indices: torch.IntTensor) Tuple[int, bool] #
Tries to replace the path from predecessor_node to node with a direct_node from predecessor_node
- Parameters:
node_ids (torch.IntTensor) –
paths_ids (torch.IntTensor) –
best_indices (torch.IntTensor) –
- Return type:
Tuple[int, bool]
- percentage_range(start: int, stop: int) range #
- Parameters:
start (int) –
stop (int) –
- Return type:
range
- get_progress_info(iteration: int, num_steps: int, print_percentage: bool = False, verbose: bool = False) torch.FloatTensor #
- Parameters:
iteration (int) –
num_steps (int) –
print_percentage (bool) –
verbose (bool) –
- Return type:
torch.FloatTensor
- callback_and_progress_check(iteration: int, num_steps: int, change_goal: bool = False, verbose: bool = False) torch.BoolTensor #
Calls the search callback. Returns True if goal reached
- Parameters:
iteration (int) –
num_steps (int) –
change_goal (bool) –
verbose (bool) –
- Return type:
torch.BoolTensor
- class planner.core.graph_worker.SingleGoalWorker(plant: jacta.planner.dynamics.simulator_plant.SimulatorPlant, graph: jacta.planner.core.graph.Graph, action_sampler: jacta.planner.core.action_sampler.ActionSampler, logger: jacta.planner.core.logger.Logger, params: jacta.planner.core.parameter_container.ParameterContainer, callback: Callable | None = None, callback_period: int | None = None)#
Bases:
GraphWorker
- Parameters:
plant (jacta.planner.dynamics.simulator_plant.SimulatorPlant) –
graph (jacta.planner.core.graph.Graph) –
action_sampler (jacta.planner.core.action_sampler.ActionSampler) –
logger (jacta.planner.core.logger.Logger) –
params (jacta.planner.core.parameter_container.ParameterContainer) –
callback (Optional[Callable]) –
callback_period (Optional[int]) –
- work(verbose: bool = False) bool #
Tries to find a path to a single goal.
- Parameters:
verbose (bool) –
- Return type:
bool
- class planner.core.graph_worker.ParallelGoalsWorker(*args: Tuple, **kwargs: dict)#
Bases:
GraphWorker
- Parameters:
args (Tuple) –
kwargs (dict) –
- try_to_reallocate_workers(worker_reset_mask: torch.BoolTensor) None #
- Parameters:
worker_reset_mask (torch.BoolTensor) –
- Return type:
None
- update_extension_lengths(search_reset_mask: torch.BoolTensor) None #
- Parameters:
search_reset_mask (torch.BoolTensor) –
- Return type:
None
- reset_finished_workers() None #
- Return type:
None
- update_pareto_parameters(node_ids: torch.IntTensor, new_node_ids: torch.IntTensor) None #
- Parameters:
node_ids (torch.IntTensor) –
new_node_ids (torch.IntTensor) –
- Return type:
None
- work(verbose: bool = False) bool #
Tries to find a path to a single goal.
- Parameters:
verbose (bool) –
- Return type:
bool
- class planner.core.graph_worker.CommonGoalWorkerInterface(*args: Tuple, **kwargs: dict)#
- Parameters:
args (Tuple) –
kwargs (dict) –
- class planner.core.graph_worker.RelatedGoalWorker(*args: Tuple, **kwargs: dict)#
Bases:
CommonGoalWorkerInterface
- Parameters:
args (Tuple) –
kwargs (dict) –
- work(verbose: bool = False) bool #
Tries to find paths to goals sampled around the actual goal.
- Parameters:
verbose (bool) –
- Return type:
bool
- class planner.core.graph_worker.ExplorerWorker(*args: Tuple, **kwargs: dict)#
Bases:
CommonGoalWorkerInterface
- Parameters:
args (Tuple) –
kwargs (dict) –
- work(verbose: bool = False) bool #
Tries to find paths to randomly sampled goals
- Parameters:
verbose (bool) –
- Return type:
bool
- class planner.core.graph_worker.RolloutWorker(plant: jacta.planner.dynamics.simulator_plant.SimulatorPlant, graph: jacta.planner.core.graph.Graph, action_sampler: jacta.planner.core.action_sampler.ActionSampler, logger: jacta.planner.core.logger.Logger, params: jacta.planner.core.parameter_container.ParameterContainer, callback: Callable | None = None, callback_period: int | None = None)#
Bases:
GraphWorker
- Parameters:
plant (jacta.planner.dynamics.simulator_plant.SimulatorPlant) –
graph (jacta.planner.core.graph.Graph) –
action_sampler (jacta.planner.core.action_sampler.ActionSampler) –
logger (jacta.planner.core.logger.Logger) –
params (jacta.planner.core.parameter_container.ParameterContainer) –
callback (Optional[Callable]) –
callback_period (Optional[int]) –
- work(verbose: bool = False) bool #
Always extends the last node.
- Parameters:
verbose (bool) –
- Return type:
bool
- planner.core.graph_worker.inspect_action_type(graph_worker: GraphWorker, action_type: jacta.planner.core.types.ActionType, node_ids: torch.IntTensor | None = None, num_action_steps: int = 100) torch.FloatTensor #
Inspection tool for a specific action type. This rollout the dynamics of the system assuming that we always select the same action_type.
- Parameters:
graph_worker (GraphWorker) –
action_type (jacta.planner.core.types.ActionType) –
node_ids (torch.IntTensor | None) –
num_action_steps (int) –
- Return type:
torch.FloatTensor