16. ActionSelector and Contingent Execution Environment
This notebook demonstrates a custom ActionSelector and its interaction with a SimulatedExecutionEnvironment.
16.1. Setup
If needed, install unified-planning and contingent dependencies.
[ ]:
%pip install unified-planning
%pip install pysmt
!yes | pysmt-install --z3
Requirement already satisfied: unified-planning in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (1.2.0.311.dev1)
Requirement already satisfied: pyparsing in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (3.3.2)
Requirement already satisfied: networkx in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (3.6.1)
Requirement already satisfied: ConfigSpace in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (1.2.2)
Requirement already satisfied: pddl in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (0.4.5)
Requirement already satisfied: pysmt in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (0.9.6)
Requirement already satisfied: numpy in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (1.26.4)
Requirement already satisfied: scipy in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (1.17.0)
Requirement already satisfied: typing_extensions in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (4.15.0)
Requirement already satisfied: more_itertools in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (10.8.0)
Requirement already satisfied: lark<1.2.0,>=1.1.5 in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from pddl->unified-planning) (1.1.9)
Requirement already satisfied: click<9.0.0,>=8.1.3 in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from pddl->unified-planning) (8.3.1)
Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: pysmt in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (0.9.6)
Note: you may need to restart the kernel to use updated packages.
16.2. Imports
[2]:
from typing import Dict, Optional
from unified_planning.environment import Environment
from unified_planning.engines.engine import Engine
from unified_planning.engines.mixins.action_selector import ActionSelectorMixin
from unified_planning.model import Fluent, InstantaneousAction, Problem, ProblemKind
from unified_planning.model.contingent import (
ContingentProblem,
SensingAction,
SimulatedExecutionEnvironment,
)
16.3. Define a custom ActionSelector
An ActionSelector is responsible for selecting the next action to execute based on the current state of the environment. In this example, we will create a simple ActionSelector that always selects a specific action depending on the first fluent in the environment observation.
[3]:
class DemoActionSelector(Engine, ActionSelectorMixin):
def __init__(
self,
problem,
error_on_failed_checks: bool = True,
default_action: Optional[str] = None,
true_action: Optional[str] = None,
false_action: Optional[str] = None,
**kwargs,
):
Engine.__init__(self)
self.error_on_failed_checks = error_on_failed_checks
self._default_action = default_action
self._true_action = true_action
self._false_action = false_action
self._next_action_name: Optional[str] = None
ActionSelectorMixin.__init__(self, problem)
if self._default_action is None:
self._default_action = next(iter(problem.actions)).name
self._next_action_name = self._default_action
@property
def name(self):
return "demo-action-selector"
@staticmethod
def supported_kind() -> ProblemKind:
return ProblemKind()
@staticmethod
def supports(problem_kind: ProblemKind) -> bool:
return True
def _get_action(self):
assert self._next_action_name is not None
return self._problem.action(self._next_action_name)()
def _update(self, observation: Dict):
if (
not observation
or self._true_action is None
or self._false_action is None
):
return
observed_value = next(iter(observation.values()))
if observed_value.is_bool_constant():
self._next_action_name = (
self._true_action
if observed_value.bool_constant_value()
else self._false_action
)
16.4. Minimal ActionSelector Flow
In this section, we demonstrate a minimal flow of using an ActionSelector. We create a simple problem with a single fluent which can be toggled by an action. The ActionSelector will always choose to flip the fluent value.
[4]:
env = Environment()
problem = Problem("minimal_action_selector_problem", env)
flag = Fluent("flag", environment=env)
problem.add_fluent(flag, default_initial_value=False)
set_true = InstantaneousAction("set_true", _env=env)
set_true.add_effect(flag, True)
set_false = InstantaneousAction("set_false", _env=env)
set_false.add_effect(flag, False)
problem.add_actions([set_true, set_false])
env.factory.add_engine("demo-action-selector", __name__, "DemoActionSelector")
with env.factory.ActionSelector(
problem,
name="demo-action-selector",
params={
"default_action": "set_true",
"true_action": "set_false",
"false_action": "set_true",
},
) as selector:
first_action = selector.get_action()
selector.update({flag(): env.expression_manager.TRUE()})
second_action = selector.get_action()
assert first_action.action.name == "set_true"
assert second_action.action.name == "set_false"
print(first_action, second_action)
set_true set_false
16.5. End-to-end contingent closed loop
In this section, we demonstrate an end-to-end flow of using a SimulatedExecutionEnvironment with the same ActionSelector. We create a parcel delivery problem where the agent must deliver a parcel, but it is initially unknown whether the parcel is a box or a bag. The ActionSelector will always choose first sense whether it is a box, and then choose the appropriate delivery action based on the observation.
[6]:
env2 = Environment()
problem2 = ContingentProblem("parcel_delivery", environment=env2)
is_box = Fluent("is_box", environment=env2)
delivered = Fluent("delivered", environment=env2)
problem2.add_fluent(is_box, default_initial_value=False)
problem2.add_fluent(delivered, default_initial_value=False)
problem2.add_unknown_initial_constraint(is_box)
sense_box = SensingAction("sense_box", _env=env2)
sense_box.add_observed_fluent(is_box())
pick_box = InstantaneousAction("pick_box", _env=env2)
pick_box.add_precondition(is_box())
pick_box.add_effect(delivered, True)
pick_bag = InstantaneousAction("pick_bag", _env=env2)
pick_bag.add_precondition(env2.expression_manager.Not(is_box()))
pick_bag.add_effect(delivered, True)
problem2.add_actions([sense_box, pick_box, pick_bag])
problem2.add_goal(delivered())
env2.factory.add_engine("demo-action-selector", __name__, "DemoActionSelector")
execution_env = SimulatedExecutionEnvironment(problem2, max_constraints=1)
with env2.factory.ActionSelector(
problem2,
name="demo-action-selector",
params={
"default_action": "sense_box",
"true_action": "pick_box",
"false_action": "pick_bag",
},
) as selector:
for _ in range(2):
action_instance = selector.get_action()
print("Selected action:", action_instance)
observation = execution_env.apply(action_instance)
print("Observation:", observation)
selector.update(observation)
if execution_env.is_goal_reached():
break
assert execution_env.is_goal_reached()
print("Goal reached:", execution_env.is_goal_reached())
Selected action: sense_box
Observation: {is_box: true}
Selected action: pick_box
Observation: {}
Goal reached: True