16. ActionSelector and Contingent Execution Environment

This notebook demonstrates a custom ActionSelector and its interaction with a SimulatedExecutionEnvironment.

Open In GitHub Open In Colab

16.1. Setup

If needed, install unified-planning and contingent dependencies.

[ ]:
%pip install unified-planning
%pip install pysmt
!yes | pysmt-install --z3
Requirement already satisfied: unified-planning in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (1.2.0.311.dev1)
Requirement already satisfied: pyparsing in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (3.3.2)
Requirement already satisfied: networkx in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (3.6.1)
Requirement already satisfied: ConfigSpace in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (1.2.2)
Requirement already satisfied: pddl in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (0.4.5)
Requirement already satisfied: pysmt in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (0.9.6)
Requirement already satisfied: numpy in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (1.26.4)
Requirement already satisfied: scipy in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (1.17.0)
Requirement already satisfied: typing_extensions in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (4.15.0)
Requirement already satisfied: more_itertools in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (10.8.0)
Requirement already satisfied: lark<1.2.0,>=1.1.5 in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from pddl->unified-planning) (1.1.9)
Requirement already satisfied: click<9.0.0,>=8.1.3 in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from pddl->unified-planning) (8.3.1)
Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: pysmt in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (0.9.6)
Note: you may need to restart the kernel to use updated packages.

16.2. Imports

[2]:
from typing import Dict, Optional

from unified_planning.environment import Environment
from unified_planning.engines.engine import Engine
from unified_planning.engines.mixins.action_selector import ActionSelectorMixin
from unified_planning.model import Fluent, InstantaneousAction, Problem, ProblemKind
from unified_planning.model.contingent import (
    ContingentProblem,
    SensingAction,
    SimulatedExecutionEnvironment,
)

16.3. Define a custom ActionSelector

An ActionSelector is responsible for selecting the next action to execute based on the current state of the environment. In this example, we will create a simple ActionSelector that always selects a specific action depending on the first fluent in the environment observation.

[3]:
class DemoActionSelector(Engine, ActionSelectorMixin):
    def __init__(
        self,
        problem,
        error_on_failed_checks: bool = True,
        default_action: Optional[str] = None,
        true_action: Optional[str] = None,
        false_action: Optional[str] = None,
        **kwargs,
    ):
        Engine.__init__(self)
        self.error_on_failed_checks = error_on_failed_checks
        self._default_action = default_action
        self._true_action = true_action
        self._false_action = false_action
        self._next_action_name: Optional[str] = None
        ActionSelectorMixin.__init__(self, problem)
        if self._default_action is None:
            self._default_action = next(iter(problem.actions)).name
        self._next_action_name = self._default_action

    @property
    def name(self):
        return "demo-action-selector"

    @staticmethod
    def supported_kind() -> ProblemKind:
        return ProblemKind()

    @staticmethod
    def supports(problem_kind: ProblemKind) -> bool:
        return True

    def _get_action(self):
        assert self._next_action_name is not None
        return self._problem.action(self._next_action_name)()

    def _update(self, observation: Dict):
        if (
            not observation
            or self._true_action is None
            or self._false_action is None
        ):
            return
        observed_value = next(iter(observation.values()))
        if observed_value.is_bool_constant():
            self._next_action_name = (
                self._true_action
                if observed_value.bool_constant_value()
                else self._false_action
            )

16.4. Minimal ActionSelector Flow

In this section, we demonstrate a minimal flow of using an ActionSelector. We create a simple problem with a single fluent which can be toggled by an action. The ActionSelector will always choose to flip the fluent value.

[4]:
env = Environment()
problem = Problem("minimal_action_selector_problem", env)
flag = Fluent("flag", environment=env)
problem.add_fluent(flag, default_initial_value=False)

set_true = InstantaneousAction("set_true", _env=env)
set_true.add_effect(flag, True)
set_false = InstantaneousAction("set_false", _env=env)
set_false.add_effect(flag, False)
problem.add_actions([set_true, set_false])

env.factory.add_engine("demo-action-selector", __name__, "DemoActionSelector")
with env.factory.ActionSelector(
    problem,
    name="demo-action-selector",
    params={
        "default_action": "set_true",
        "true_action": "set_false",
        "false_action": "set_true",
    },
) as selector:
    first_action = selector.get_action()
    selector.update({flag(): env.expression_manager.TRUE()})
    second_action = selector.get_action()

assert first_action.action.name == "set_true"
assert second_action.action.name == "set_false"
print(first_action, second_action)

set_true set_false

16.5. End-to-end contingent closed loop

In this section, we demonstrate an end-to-end flow of using a SimulatedExecutionEnvironment with the same ActionSelector. We create a parcel delivery problem where the agent must deliver a parcel, but it is initially unknown whether the parcel is a box or a bag. The ActionSelector will always choose first sense whether it is a box, and then choose the appropriate delivery action based on the observation.

[6]:
env2 = Environment()
problem2 = ContingentProblem("parcel_delivery", environment=env2)

is_box = Fluent("is_box", environment=env2)
delivered = Fluent("delivered", environment=env2)
problem2.add_fluent(is_box, default_initial_value=False)
problem2.add_fluent(delivered, default_initial_value=False)
problem2.add_unknown_initial_constraint(is_box)

sense_box = SensingAction("sense_box", _env=env2)
sense_box.add_observed_fluent(is_box())

pick_box = InstantaneousAction("pick_box", _env=env2)
pick_box.add_precondition(is_box())
pick_box.add_effect(delivered, True)

pick_bag = InstantaneousAction("pick_bag", _env=env2)
pick_bag.add_precondition(env2.expression_manager.Not(is_box()))
pick_bag.add_effect(delivered, True)

problem2.add_actions([sense_box, pick_box, pick_bag])
problem2.add_goal(delivered())

env2.factory.add_engine("demo-action-selector", __name__, "DemoActionSelector")
execution_env = SimulatedExecutionEnvironment(problem2, max_constraints=1)

with env2.factory.ActionSelector(
    problem2,
    name="demo-action-selector",
    params={
        "default_action": "sense_box",
        "true_action": "pick_box",
        "false_action": "pick_bag",
    },
) as selector:
    for _ in range(2):
        action_instance = selector.get_action()
        print("Selected action:", action_instance)
        observation = execution_env.apply(action_instance)
        print("Observation:", observation)
        selector.update(observation)
        if execution_env.is_goal_reached():
            break

assert execution_env.is_goal_reached()
print("Goal reached:", execution_env.is_goal_reached())

Selected action: sense_box
Observation: {is_box: true}
Selected action: pick_box
Observation: {}
Goal reached: True