7. Meta Engines
In this notebook we define an oversubscription planning problem and we solve it using a MetaEngine
.
7.1. Setup the library
First, we install unified_planning library and its dependencies from PyPi. Here, we use the --pre
flag to use the latest development build.
[1]:
!apt install graphviz graphviz-dev
%pip install unified-planning[tamer,plot]
7.2. Problem definition
We model an oversubscription planning problem.
[2]:
from unified_planning.shortcuts import *
We start the problem modeling defining the UserType
and the Fluent
.
[3]:
Location = UserType('Location')
Robot = UserType('Robot')
at = Fluent('at', BoolType(), robot=Robot, location=Location)
visited = Fluent('visited', BoolType(), robot=Robot, location=Location)
connected = Fluent('connected', BoolType(), l_from=Location, l_to=Location)
We define an action move
that models the movement of a robot between two locations.
[4]:
move = InstantaneousAction('move', robot=Robot, l_from=Location, l_to=Location)
robot = move.parameter('robot')
l_from = move.parameter('l_from')
l_to = move.parameter('l_to')
move.add_precondition(at(robot, l_from))
move.add_precondition(connected(l_from, l_to))
move.add_effect(at(robot, l_from), False)
move.add_effect(at(robot, l_to), True)
move.add_effect(visited(robot, l_to), True)
We define the Object
instances and, after creating the Problem
, we set the initial values.
[5]:
r1 = Object('r1', Robot)
NLOC = 10
locations = [Object('l%s' % i, Location) for i in range(NLOC)]
problem = Problem('robot_with_simulated_effects')
problem.add_fluent(at, default_initial_value=False)
problem.add_fluent(visited, default_initial_value=False)
problem.add_fluent(connected, default_initial_value=False)
problem.add_action(move)
problem.add_object(r1)
problem.add_objects(locations)
problem.set_initial_value(at(r1, locations[0]), True)
problem.set_initial_value(visited(r1, locations[0]), True)
for i in range(NLOC - 1):
problem.set_initial_value(connected(locations[i], locations[i+1]), True)
problem.set_initial_value(connected(locations[4], locations[8]), True)
Finally, we define the oversubscription goals.
[6]:
goals = {}
goals[visited(r1, locations[5])] = -5
goals[visited(r1, locations[7])] = 4
goals[visited(r1, locations[9])] = 10
problem.add_quality_metric(up.model.metrics.Oversubscription(goals))
7.3. Solving the problem
We solve the problem using the oversubscription MetaEngine
with the tamer Engine
.
[7]:
with OneshotPlanner(name='oversubscription[tamer]') as planner:
result = planner.solve(problem)
print("%s returned: %s" % (planner.name, result.plan))
OversubscriptionPlanner[Tamer] returned: SequentialPlan:
move(r1, l0, l1)
move(r1, l1, l2)
move(r1, l2, l3)
move(r1, l3, l4)
move(r1, l4, l8)
move(r1, l8, l9)
To test the oversubscription MetaEngine
, we update the oversubscription goals to see if it finds a different plan.
[8]:
problem.clear_quality_metrics()
goals = {}
goals[visited(r1, locations[5])] = -5
goals[visited(r1, locations[7])] = 6
goals[visited(r1, locations[9])] = 10
problem.add_quality_metric(up.model.metrics.Oversubscription(goals))
To solve the new problem, now we let the system choose the Engine
to use.
[9]:
with OneshotPlanner(problem_kind=problem.kind) as planner:
result = planner.solve(problem)
plan = result.plan
assert plan is not None
print("%s returned: %s" % (planner.name, result.plan))
OversubscriptionPlanner[Fast Downward] returned: SequentialPlan:
move(r1, l0, l1)
move(r1, l1, l2)
move(r1, l2, l3)
move(r1, l3, l4)
move(r1, l4, l5)
move(r1, l5, l6)
move(r1, l6, l7)
move(r1, l7, l8)
move(r1, l8, l9)
[ ]:
from unified_planning.plot import plot_sequential_plan
Ignore the code below, it’s used to make this notebook also runnable in the Countinuous Intergation.
[ ]:
# Redefine the plot package methods imported above to print the plot to a temp file
# if the exception "could not locate runnable browser" is raised. This usually happens
# in the Continuous Integration.
from inspect import getmembers, isfunction
from unified_planning import plot
from functools import partial
import os, uuid, tempfile as tf
# Define the function that will be executed instead
def _function(original_function, *args, **kwargs):
try:
original_function(*args, **kwargs)
except Exception as e:
if "could not locate runnable browser" in str(e):
original_function(*args, **kwargs,
filename=f"{os.path.join(tf.gettempdir(), str(uuid.uuid1()))}.png"
)
else:
raise e
# Iterate over all the functions of the plot package
for function_name, function in getmembers(plot, isfunction):
# Override the original function with the new one
globals()[function_name] = partial(_function, function)
[ ]:
plot_sequential_plan(plan, figsize=(18, 4), node_size=8000, font_size=11)