9. SequentialSimulator

This python notebook shows the usage of the SequentialSimulator.

Open In GitHub Open In Colab

9.1. Setup

First, we install unified_planning library and its dependencies from PyPi. Here, we use the --pre flag to use the latest development build.

[ ]:
!apt install graphviz graphviz-dev
%pip install unified-planning[plot]

We are now ready to use the Unified-Planning library!

9.2. Demo

9.2.1. Basic imports

The basic imports we need for this demo are abstracted in the shortcuts package.

[2]:
from unified_planning.shortcuts import *

9.2.2. Problem definition

We start the problem modeling defining the UserType and the Fluent.

[3]:
Location = UserType('Location')

at = Fluent('at', Location)
distance = Fluent('distance', IntType(), l1=Location, l2=Location)
battery_charge = Fluent('battery_charge', IntType(0, 100))

We define an action move that decreases the battery_charge by the distance of the move.

Note that the battery can never go under 0, so every action that would reduce the battery to a negative number is not applicable.

[4]:
move = InstantaneousAction('move', l_from=Location, l_to=Location)
l_from = move.parameter('l_from')
l_to = move.parameter('l_to')
move.add_precondition(Equals(at, l_from))
move.add_effect(at, l_to)
move.add_decrease_effect(battery_charge, distance(l_from, l_to))

Finally, we define the Object instances and, after creating the Problem, we set the initial values and the goal.

Set the default of the distance to 101, so only the distances that we define explicitly create valid connections between locations. (Otherwise, the Typing bound on the battery_charge would be violated.)

[5]:
l1 = Object('l1', Location)
l2 = Object('l2', Location)
l3 = Object('l3', Location)
l4 = Object('l4', Location)
l5 = Object('l5', Location)
locations = [l5, l4, l3, l2, l1]

problem = Problem('moving_robot')
problem.add_fluent(at)
problem.add_fluent(battery_charge)
problem.add_fluent(distance, default_initial_value = 101)
problem.add_action(move)
problem.add_objects(locations)

problem.set_initial_value(at, l1)
problem.set_initial_value(battery_charge, 100)

problem.set_initial_value(distance(l1, l2), 20)
problem.set_initial_value(distance(l2, l3), 30)
problem.set_initial_value(distance(l3, l4), 20)
problem.set_initial_value(distance(l4, l5), 30)

problem.set_initial_value(distance(l1, l3), 60)

problem.add_goal(Equals(at, l5))
battery_exp = FluentExp(battery_charge)

9.2.3. Simulating the problem

Get the simulator with the SequentialSimulator Operation Mode and start simulating.

Since we have to reach l5, we iterate over the locations and see which locations we can reach from l1 using the simulator.is_applicable method.

[6]:
with SequentialSimulator(problem=problem) as simulator:
    initial_state = simulator.get_initial_state()
    for travel_location in locations:
        if simulator.is_applicable(initial_state, move, (l1, travel_location)):
            print(f"From l1 we can reach: {travel_location}")
From l1 we can reach: l3
From l1 we can reach: l2

As we see, from l1 we can reach l2 and l3; since l3 seems closer to l5, we decide to go there!

[7]:
    state_at_l3 = simulator.apply(initial_state, move, (l1, l3))
    for travel_location in locations:
        if simulator.is_applicable(state_at_l3, move, (l3, travel_location)):
            print(f"From l3 we can reach: {travel_location}")
    state_at_l4 = simulator.apply(state_at_l3, move, (l3, l4))
    if simulator.is_applicable(state_at_l4, move, (l4, l5)):
        print('Done!')
    else:
        print(f'Problem! Battery too low: {state_at_l4.get_value(battery_exp)}')
From l3 we can reach: l4
Problem! Battery too low: 20

We found a problem in our simulation. From the state in which we reached l3, the remaining battery is not enough to reach l5.

The only decision we made to go to reach l3 directly from l1 seemed wrong, so let’s try to reach l3 passing from l2, and see it that saves battery.

[8]:
    state_at_l2 = simulator.apply(initial_state, move, (l1, l2))
    new_state_at_l3 = simulator.apply(state_at_l2, move, (l2, l3))
    new_state_better = (new_state_at_l3.get_value(battery_exp) > state_at_l3.get_value(battery_exp)).simplify()
    if new_state_better.bool_constant_value():
        print("Reaching l3 passing through l2 saves battery!")
    else:
        print("Can't save battery reaching l3, the problem has no solution!")
Reaching l3 passing through l2 saves battery!

As we saw, this saves battery, so let’s try reaching l5 with this new battery value.

[9]:
    state_at_l3 = new_state_at_l3
    state_at_l4 = simulator.apply(state_at_l3, move, (l3, l4))
    if simulator.is_applicable(state_at_l4, move, (l4, l5)):
        print('Done!')
    else:
        print(f'Problem! Battery too low: {state_at_l4.get_value(battery_exp)}')
    state_at_l5 = simulator.apply(state_at_l4, move, (l4, l5))
Done!

9.2.4. Simulating a SequentialPlan and inspect the State

As we saw, a possible plan is to go trough the Locations in order, so let’s create a SequentialPlan that does this.

After creating the plan, we can simulate it and, for example, save the values of the battery during the whole plan in a list.

[10]:
    from unified_planning.plans import SequentialPlan, ActionInstance

    plan = SequentialPlan([
        ActionInstance(move, (l1, l2)),
        ActionInstance(move, (l2, l3)),
        ActionInstance(move, (l3, l4)),
        ActionInstance(move, (l4, l5))
    ])
    print(f"Initial battery value: {initial_state.get_value(battery_exp).constant_value()}")
    current_state = initial_state
    # We also store the states to plot the metrics later
    states = [current_state]
    for action_instance in plan.actions:
        current_state = simulator.apply(current_state, action_instance)
        if current_state is None:
            print(f'Error in applying: {action_instance}')
            break
        states.append(current_state)
        current_battery_value = current_state.get_value(battery_exp).constant_value()
        # in current_battery_value we inspect the State
        print(f"Battery value after {action_instance}: {current_battery_value}")
Initial battery value: 100
Battery value after move(l1, l2): 80
Battery value after move(l2, l3): 50
Battery value after move(l3, l4): 30
Battery value after move(l4, l5): 0

After we stored the battery values during the plan, we can do anything with those; in this example we plot the battery values in a graph to see how it decreases at each step.

[ ]:
    from unified_planning.plot import plot_sequential_plan

Ignore the code below, it’s used to make this notebook also runnable in the Countinuous Intergation.

[ ]:
    # Redefine the plot package methods imported above to print the plot to a temp file
    # if the exception "could not locate runnable browser" is raised. This usually happens
    # in the Continuous Integration.

    from inspect import getmembers, isfunction
    from unified_planning import plot
    from functools import partial
    import os, uuid, tempfile as tf

    # Define the function that will be executed instead
    def _function(original_function, *args, **kwargs):
        try:
            original_function(*args, **kwargs)
        except Exception as e:
            if "could not locate runnable browser" in str(e):
                original_function(*args, **kwargs,
                    filename=f"{os.path.join(tf.gettempdir(), str(uuid.uuid1()))}.png"
                )
            else:
                raise e

    # Iterate over all the functions of the plot package
    for function_name, function in getmembers(plot, isfunction):
        # Override the original function with the new one
        globals()[function_name] = partial(_function, function)
[ ]:
    plot_sequential_plan(plan, problem, battery_exp, figsize=(9, 3))

9.2.5. Evaluating the Plan Quality Metrics

The UP engines package offers a utility function that takes a SequentialSimulator and evaluates a PlanQualityMetric; this is done by using the State and the actions sequence.

[12]:
    from unified_planning.engines.sequential_simulator import evaluate_quality_metric, evaluate_quality_metric_in_initial_state

    plan_length = MinimizeSequentialPlanLength()
    maximize_battery = MaximizeExpressionOnFinalState(battery_exp)

    plan_length_value = evaluate_quality_metric_in_initial_state(simulator, plan_length)
    maximize_battery_value = evaluate_quality_metric_in_initial_state(simulator, maximize_battery)

    current_state = states[0]
    for next_state, action_instance in zip(states[1:], plan.actions):
        plan_length_value = evaluate_quality_metric(
            simulator,
            plan_length,
            plan_length_value,
            current_state,
            action_instance.action,
            action_instance.actual_parameters,
            next_state
        )
        maximize_battery_value = evaluate_quality_metric(
            simulator,
            maximize_battery,
            maximize_battery_value,
            current_state,
            action_instance.action,
            action_instance.actual_parameters,
            next_state
        )
        current_state = next_state

        # Do something with the metric values
        print(f'Plan length: {plan_length_value}\nMaximized epression value: {maximize_battery_value}')
Plan length: 1
Maximized epression value: 80
Plan length: 2
Maximized epression value: 50
Plan length: 3
Maximized epression value: 30
Plan length: 4
Maximized epression value: 0

Also the value of the metrics can be plotted

[ ]:
    try:
        plot_sequential_plan(plan, problem, metric_or_metrics=[plan_length, maximize_battery], figsize=(9, 3))
    except Exception as e:
        if "could not locate runnable browser" in str(e):
            plot_sequential_plan(plan, problem, battery_exp, figsize=(9, 3), filename="sequential_simulator_2.png")
        else:
            raise e