7. Meta Engines

In this notebook we define an oversubscription planning problem and we solve it using a MetaEngine.

Open In GitHub Open In Colab

7.1. Setup the library

First, we install unified_planning library and its dependencies from PyPi. Here, we use the --pre flag to use the latest development build.

[1]:
!apt install graphviz graphviz-dev
%pip install unified-planning[tamer,plot]

7.2. Problem definition

We model an oversubscription planning problem.

[2]:
from unified_planning.shortcuts import *

We start the problem modeling defining the UserType and the Fluent.

[3]:
Location = UserType('Location')
Robot = UserType('Robot')

at = Fluent('at', BoolType(), robot=Robot, location=Location)
visited = Fluent('visited', BoolType(), robot=Robot, location=Location)
connected = Fluent('connected', BoolType(), l_from=Location, l_to=Location)

We define an action move that models the movement of a robot between two locations.

[4]:
move = InstantaneousAction('move', robot=Robot, l_from=Location, l_to=Location)
robot = move.parameter('robot')
l_from = move.parameter('l_from')
l_to = move.parameter('l_to')
move.add_precondition(at(robot, l_from))
move.add_precondition(connected(l_from, l_to))
move.add_effect(at(robot, l_from), False)
move.add_effect(at(robot, l_to), True)
move.add_effect(visited(robot, l_to), True)

We define the Object instances and, after creating the Problem, we set the initial values.

[5]:
r1 = Object('r1', Robot)
NLOC = 10
locations = [Object('l%s' % i, Location) for i in range(NLOC)]

problem = Problem('robot_with_simulated_effects')
problem.add_fluent(at, default_initial_value=False)
problem.add_fluent(visited, default_initial_value=False)
problem.add_fluent(connected, default_initial_value=False)
problem.add_action(move)

problem.add_object(r1)
problem.add_objects(locations)

problem.set_initial_value(at(r1, locations[0]), True)
problem.set_initial_value(visited(r1, locations[0]), True)
for i in range(NLOC - 1):
    problem.set_initial_value(connected(locations[i], locations[i+1]), True)
problem.set_initial_value(connected(locations[4], locations[8]), True)

Finally, we define the oversubscription goals.

[6]:
goals = {}
goals[visited(r1, locations[5])] = -5
goals[visited(r1, locations[7])] = 4
goals[visited(r1, locations[9])] = 10

problem.add_quality_metric(up.model.metrics.Oversubscription(goals))

7.3. Solving the problem

We solve the problem using the oversubscription MetaEngine with the tamer Engine.

[7]:
with OneshotPlanner(name='oversubscription[tamer]') as planner:
    result = planner.solve(problem)
    print("%s returned: %s" % (planner.name, result.plan))
OversubscriptionPlanner[Tamer] returned: SequentialPlan:
    move(r1, l0, l1)
    move(r1, l1, l2)
    move(r1, l2, l3)
    move(r1, l3, l4)
    move(r1, l4, l8)
    move(r1, l8, l9)

To test the oversubscription MetaEngine, we update the oversubscription goals to see if it finds a different plan.

[8]:
problem.clear_quality_metrics()

goals = {}
goals[visited(r1, locations[5])] = -5
goals[visited(r1, locations[7])] = 6
goals[visited(r1, locations[9])] = 10

problem.add_quality_metric(up.model.metrics.Oversubscription(goals))

To solve the new problem, now we let the system choose the Engine to use.

[9]:
with OneshotPlanner(problem_kind=problem.kind) as planner:
    result = planner.solve(problem)
    plan = result.plan
    assert plan is not None
    print("%s returned: %s" % (planner.name, result.plan))
OversubscriptionPlanner[Fast Downward] returned: SequentialPlan:
    move(r1, l0, l1)
    move(r1, l1, l2)
    move(r1, l2, l3)
    move(r1, l3, l4)
    move(r1, l4, l5)
    move(r1, l5, l6)
    move(r1, l6, l7)
    move(r1, l7, l8)
    move(r1, l8, l9)
[ ]:
from unified_planning.plot import plot_sequential_plan

Ignore the code below, it’s used to make this notebook also runnable in the Countinuous Intergation.

[ ]:
# Redefine the plot package methods imported above to print the plot to a temp file
# if the exception "could not locate runnable browser" is raised. This usually happens
# in the Continuous Integration.

from inspect import getmembers, isfunction
from unified_planning import plot
from functools import partial
import os, uuid, tempfile as tf

# Define the function that will be executed instead
def _function(original_function, *args, **kwargs):
    try:
        original_function(*args, **kwargs)
    except Exception as e:
        if "could not locate runnable browser" in str(e):
            original_function(*args, **kwargs,
                filename=f"{os.path.join(tf.gettempdir(), str(uuid.uuid1()))}.png"
            )
        else:
            raise e

# Iterate over all the functions of the plot package
for function_name, function in getmembers(plot, isfunction):
    # Override the original function with the new one
    globals()[function_name] = partial(_function, function)
[ ]:
plot_sequential_plan(plan, figsize=(18, 4), node_size=8000, font_size=11)