Execution Environment

The contingent module exposes execution-environment abstractions for closed-loop interaction with a planning problem.

ExecutionEnvironment defines the minimal contract:

  • apply(action) executes one action and returns an observation map.

  • is_goal_reached() checks whether the current execution state satisfies the goals.

SimulatedExecutionEnvironment provides a concrete implementation for ContingentProblem. It:

  • clones the contingent problem into a deterministic Problem for simulation;

  • samples hidden initial fluents consistently with oneof and or constraints;

  • executes actions through the sequential simulator;

  • returns observed fluent values when executing SensingAction instances.

Note

SimulatedExecutionEnvironment uses pysmt to sample hidden initial assignments and requires at least one available SMT solver backend (for example z3).

API Reference

For complete class APIs, see:

Usage Example

This end-to-end example follows the workflow of the ActionSelector and Contingent Execution Environment notebook, combining a custom selector with SimulatedExecutionEnvironment in a contingent closed loop.

End-to-end contingent closed loop with SimulatedExecutionEnvironment
env2 = Environment()
problem2 = ContingentProblem("parcel_delivery", environment=env2)

is_box = Fluent("is_box", environment=env2)
delivered = Fluent("delivered", environment=env2)
problem2.add_fluent(is_box, default_initial_value=False)
problem2.add_fluent(delivered, default_initial_value=False)
problem2.add_unknown_initial_constraint(is_box)

sense_box = SensingAction("sense_box", _env=env2)
sense_box.add_observed_fluent(is_box())

pick_box = InstantaneousAction("pick_box", _env=env2)
pick_box.add_precondition(is_box())
pick_box.add_effect(delivered, True)

pick_bag = InstantaneousAction("pick_bag", _env=env2)
pick_bag.add_precondition(env2.expression_manager.Not(is_box()))
pick_bag.add_effect(delivered, True)

problem2.add_actions([sense_box, pick_box, pick_bag])
problem2.add_goal(delivered())

env2.factory.add_engine("demo-action-selector", __name__, "DemoActionSelector")
execution_env = SimulatedExecutionEnvironment(problem2, max_constraints=1)

selected_actions = []
with env2.factory.ActionSelector(
    problem2,
    name="demo-action-selector",
    params={
        "default_action": "sense_box",
        "true_action": "pick_box",
        "false_action": "pick_bag",
    },
) as selector:
    for _ in range(2):
        action_instance = selector.get_action()
        selected_actions.append(action_instance.action.name)
        print("Selected action:", action_instance)
        observation = execution_env.apply(action_instance)
        print("Observation:", observation)
        selector.update(observation)
        if execution_env.is_goal_reached():
            break

assert selected_actions[0] == "sense_box"
assert len(selected_actions) == 2
assert selected_actions[1] in ("pick_box", "pick_bag")
assert execution_env.is_goal_reached()
print("Goal reached:", execution_env.is_goal_reached())