Execution Environment
The contingent module exposes execution-environment abstractions for closed-loop interaction with a planning problem.
ExecutionEnvironment defines the minimal contract:
apply(action)executes one action and returns an observation map.is_goal_reached()checks whether the current execution state satisfies the goals.
SimulatedExecutionEnvironment provides a concrete implementation for
ContingentProblem. It:
clones the contingent problem into a deterministic
Problemfor simulation;samples hidden initial fluents consistently with
oneofandorconstraints;executes actions through the sequential simulator;
returns observed fluent values when executing
SensingActioninstances.
Note
SimulatedExecutionEnvironment uses pysmt to sample hidden initial assignments and requires at least one available SMT solver backend (for example z3).
API Reference
For complete class APIs, see:
Usage Example
This end-to-end example follows the workflow of the ActionSelector and Contingent Execution Environment notebook, combining a custom selector with SimulatedExecutionEnvironment in a contingent closed loop.
env2 = Environment()
problem2 = ContingentProblem("parcel_delivery", environment=env2)
is_box = Fluent("is_box", environment=env2)
delivered = Fluent("delivered", environment=env2)
problem2.add_fluent(is_box, default_initial_value=False)
problem2.add_fluent(delivered, default_initial_value=False)
problem2.add_unknown_initial_constraint(is_box)
sense_box = SensingAction("sense_box", _env=env2)
sense_box.add_observed_fluent(is_box())
pick_box = InstantaneousAction("pick_box", _env=env2)
pick_box.add_precondition(is_box())
pick_box.add_effect(delivered, True)
pick_bag = InstantaneousAction("pick_bag", _env=env2)
pick_bag.add_precondition(env2.expression_manager.Not(is_box()))
pick_bag.add_effect(delivered, True)
problem2.add_actions([sense_box, pick_box, pick_bag])
problem2.add_goal(delivered())
env2.factory.add_engine("demo-action-selector", __name__, "DemoActionSelector")
execution_env = SimulatedExecutionEnvironment(problem2, max_constraints=1)
selected_actions = []
with env2.factory.ActionSelector(
problem2,
name="demo-action-selector",
params={
"default_action": "sense_box",
"true_action": "pick_box",
"false_action": "pick_bag",
},
) as selector:
for _ in range(2):
action_instance = selector.get_action()
selected_actions.append(action_instance.action.name)
print("Selected action:", action_instance)
observation = execution_env.apply(action_instance)
print("Observation:", observation)
selector.update(observation)
if execution_env.is_goal_reached():
break
assert selected_actions[0] == "sense_box"
assert len(selected_actions) == 2
assert selected_actions[1] in ("pick_box", "pick_bag")
assert execution_env.is_goal_reached()
print("Goal reached:", execution_env.is_goal_reached())