6. Compilers Example
In this example we will create a unified_planning problem and then show how to use a compiler on it to obtain a new equivalent problem; then, we will get a plan for the compiled problem and translate it into the equivalent plan for the original problem.
6.1. Setup
We start by installing the library with PIP
[1]:
!apt install graphviz graphviz-dev
%pip install unified-planning[tamer,plot]
6.2. Define the UP Problem
In the end we want every job done, no heaters switched on and no electrical failure.
[2]:
from unified_planning.shortcuts import *
# Define User Types
Heater = UserType("Heater")
Job = UserType("Job")
Clean = UserType("Clean", Job)
Work = UserType("Work", Job)
# Define fluents
is_cold = Fluent("is_cold", BoolType()) # BoolType is the default, so it can be avoided
is_warm = Fluent("is_warm")
electrical_failure = Fluent("electrical_failure")
job_done = Fluent("job_done", BoolType(), job = Job)
is_on = Fluent("is_on", BoolType(), heater = Heater)
used_heater = Fluent("used_heater", BoolType(), heater = Heater)
# Define actions
switch_heater_on = InstantaneousAction("switch_heater_on", heater = Heater)
heater = switch_heater_on.parameter("heater")
switch_heater_on.add_precondition(Not(used_heater(heater))) # The heater must not have been already used
switch_heater_on.add_precondition(Not(is_on(heater))) # The heater must not be already on
switch_heater_on.add_effect(is_warm, True) # The temperature becomes warm
switch_heater_on.add_effect(is_on(heater), True) # The heater switches on
# Define a Variable of type "Heater", used for the existential condition
h_var = Variable("h_var", Heater)
# If exists an heater that is already on, we have an electrical failure
switch_heater_on.add_effect(electrical_failure, True, Exists(is_on(h_var), h_var))
switch_heater_off = InstantaneousAction("switch_heater_off", heater = Heater)
heater = switch_heater_off.parameter("heater")
switch_heater_off.add_precondition(is_on(heater)) # The heater must be on
switch_heater_off.add_effect(is_warm, False) # It is not warm anymore
switch_heater_off.add_effect(is_cold, True) # It becomes cold
switch_heater_off.add_effect(is_on(heater), False) # The heater turns off
switch_heater_off.add_effect(used_heater(heater), True) # The heater becomes used
perform_job = InstantaneousAction("perform_job", job = Job)
job = perform_job.parameter("job")
perform_job.add_precondition(is_warm) # Must be warm to do the job
perform_job.add_effect(is_warm, False) # It is not warm anymore
perform_job.add_effect(is_cold, True) # It becomes cold again
perform_job.add_effect(job_done(job), True) # The job is done
# define objects
heater_1 = Object("heater_1", Heater)
heater_2 = Object("heater_2", Heater)
clean = Object("clean", Clean)
work = Object("work", Work)
# define the problem
original_problem = Problem("heaters_and_jobs")
# add the fluents to the problem
original_problem.add_fluent(is_cold, default_initial_value = True)
original_problem.add_fluent(is_warm, default_initial_value = False)
original_problem.add_fluent(electrical_failure, default_initial_value = False)
original_problem.add_fluent(job_done, default_initial_value = False)
original_problem.add_fluent(is_on, default_initial_value = False)
original_problem.add_fluent(used_heater, default_initial_value = False)
# add objects and actions
original_problem.add_objects([heater_1, heater_2, clean, work])
original_problem.add_actions([switch_heater_on, switch_heater_off, perform_job])
# define the problem goals
original_problem.add_goal(Not(electrical_failure)) # No electrical failure
j_var = Variable("j_var", Job)
original_problem.add_goal(Forall(job_done(j_var), j_var)) # All jobs are done
original_problem.add_goal(Forall(Not(is_on(h_var)), h_var)) # All heaters are switched off
original_problem_kind = original_problem.kind
6.3. Testing Compilers
To show the usage and the capabilities of the compilers, we will take the problem we just defined and pyperplan, a planner that does not support some features of the original problem.
With the use of compilers, pyperplan will be able to solve the equivalent compiled problem and then we will rewrite the plan back to be a plan of the original problem.
6.3.1. Get pyperplan solver
We will now get the pyperplan engine and show that the original problem is not supported.
[3]:
with OneshotPlanner(name = "pyperplan") as planner:
assert not planner.supports(original_problem_kind)
NOTE: To disable printing of planning engine credits, add this line to your code: `up.shortcuts.get_environment().credits_stream = None`
*** Credits ***
* In operation mode `OneshotPlanner` at line 1 of `/tmp/ipykernel_155649/2508516188.py`, you are using the following planning engine:
* Engine name: pyperplan
* Developers: Albert-Ludwigs-Universität Freiburg (Yusra Alkhazraji, Matthias Frorath, Markus Grützner, Malte Helmert, Thomas Liebetraut, Robert Mattmüller, Manuela Ortlieb, Jendrik Seipp, Tobias Springenberg, Philip Stahl, Jan Wülfing)
* Description: Pyperplan is a lightweight STRIPS planner written in Python.
6.4. First compilation kind: Quantifiers Removing
The compilation kind QUANTIFIERS_REMOVING
takes a problem that might have a quantifier operand, thus an Exists
or a Forall
, and returns an equivalent problem that does not contain any Exists
or Forall
.
The quantifiers in the problem can be found in: - an action’s condition (or precondition) - the condition of an action’s conditional effect - the condition of a timed conditional effect - a goal or a timed goal - the assignment value of an effect
6.4.1. Example
In the unified_planning
we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.QuantifiersRemover
. It works by taking every expression in the problem and grounding the quantifiers (Exists
and Forall
) with the equivalent formula in the problem.
For example, in our problem, the goal Forall(job_done(j_var), j_var)
is equivalent to job_done(clean) and job_done(work)
, because there are only 2 possible values that the j_var
variable can have. It is interesting to note how the j_var
variable, of type Job
, is instantiated to 2 objects of type Clean
and Work
, due to the typing inheritance.
Here we have an example on how to get from the factory and use a compiler to remove quantifiers:
[4]:
from unified_planning.engines import CompilationKind
# The CompilationKind class is defined in the unified_planning/engines/mixins/compiler.py file
# To get the Compiler from the factory we can use the Compiler operation mode.
# It takes a problem_kind and a compilation_kind, and returns a compiler with the capabilities we need
with Compiler(
problem_kind = original_problem_kind,
compilation_kind = CompilationKind.QUANTIFIERS_REMOVING
) as quantifiers_remover:
# After we have the compiler, we get the compilation result
qr_result = quantifiers_remover.compile(
original_problem,
CompilationKind.QUANTIFIERS_REMOVING
)
qr_problem = qr_result.problem
qr_kind = qr_problem.kind
# Check the result of the compilation
assert original_problem_kind.has_existential_conditions() and original_problem_kind.has_universal_conditions()
assert not qr_kind.has_existential_conditions() and not qr_kind.has_universal_conditions()
6.5. Conditional Effects Removing
The conditional effects removing compilation kind takes a problem with conditional effects and returns an equivalent problem without any conditional effect.
The conditional effects can be found in: - action’s effects - timed effects
6.5.1. Example
In the unified_planning
we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.ConditionalEffectsRemover
.
It works by taking every action with at least one conditional effect and creating 2^N
actions, where N
is the number of conditional effects in the action. For each conditional effect, we have 2 variants: - one where the condition of the conditional effect becomes an action precondition and the effect of the conditional effects becomes a normal effect of the action - one where the inverse of the condition of the conditional effect becomes an action precondition.
All the possible combinations of those 2 variants of conditional effects are added, and then impossible or empty actions are removed.
Here we have an example on how to get from the factory and use a compiler to remove conditional effects:
[5]:
# Get the compiler from the factory
with Compiler(
problem_kind = qr_kind,
compilation_kind = CompilationKind.CONDITIONAL_EFFECTS_REMOVING
) as conditional_effects_remover:
# After we have the compiler, we get the compilation result
cer_result = conditional_effects_remover.compile(
qr_problem,
CompilationKind.CONDITIONAL_EFFECTS_REMOVING
)
cer_problem = cer_result.problem
cer_kind = cer_problem.kind
# Check the result of the compilation
assert original_problem_kind.has_conditional_effects()
assert qr_kind.has_conditional_effects()
assert not cer_kind.has_conditional_effects()
6.6. Disjunctive Conditions Removing
The disjunctive conditions removing compilation kind takes a problem that might have a complex expressions as an action condition or in the goals and returns an equivalent problem where every condition becomes only a conjunction of terms (an And
of terms). Where each term can be a fluent or the negation of a fluent and not a complex expression.
The complex conditions can be found in: - action’s conditions (or preconditions) - the condition of an action’s conditional effect - the condition of a timed conditional effect - problem’s goals - problem’s timed goals
6.6.1. Example
In the unified_planning
we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.DisjunctiveConditionsRemover
. It modifies all the actions by making a unique And
containing all the action’s preconditions, computes the equivalent formula of the original And
as a disjunction of conjunctions (an Or
of Ands
), and then creates an action for every element of the resulting Or
. Each resulting action has the same effects of the
original action and one element of the Or
as a precondition.
A similar pattern is used to remove disjunctions from the problem goals, but it involves adding a fresh fluent and, for every element of the resulting Or
, a fresh action.
Here we have an example on how to get from the factory and use a compiler to remove disjunctive conditions:
[6]:
# Get the compiler from the factory
with Compiler(
problem_kind = cer_kind,
compilation_kind = CompilationKind.DISJUNCTIVE_CONDITIONS_REMOVING
) as disjunctive_conditions_remover:
# After we have the compiler, we get the compilation result
dcr_result = disjunctive_conditions_remover.compile(
cer_problem,
CompilationKind.DISJUNCTIVE_CONDITIONS_REMOVING
)
dcr_problem = dcr_result.problem
dcr_kind = dcr_problem.kind
# Check the result of the compilation
assert qr_kind.has_disjunctive_conditions()
assert cer_kind.has_disjunctive_conditions()
assert not dcr_kind.has_disjunctive_conditions()
6.7. Negative Conditions Removing
The negative conditions removing compilation kind takes a problem that has the Not
operand in any problem condition and returns an equivalent problem where the Not
doesn’t appear in the problem’s conditions.
The Not
operand must be removed from: - the action’s conditions (or preconditions) - the condition of every action’s conditional effects - the condition of every timed conditional effects - problem’s goals - problem’s timed goals
6.7.1. Example
In the unified_planning
we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.DisjunctiveConditionsRemover
. The compiling process followed in the UP implementation is the following: - For every fluent that appears negated in the conditions or in the goals, a new fluent that represents the same fluent negated is created. - Every time the original fluent appears negated in an expression, it is replaced with the new fluent. - Every time
the original fluent appears in the effect of an action, an effect is added to that action. The added effect assigns to the new fluent the negated value assigned to the original fluent; this makes sure that every time the original fluent is modified, also the new fluent is modified with the inverse value, so the new fluent created always represents the opposite of the original fluent and it is used instead of the Not(original_fluent)
in the conditions.
Here we have an example on how to get from the factory and use a compiler to remove negative conditions:
[7]:
# Get the compiler from the factory
with Compiler(
problem_kind = dcr_kind,
compilation_kind = CompilationKind.NEGATIVE_CONDITIONS_REMOVING
) as negative_conditions_remover:
# After we have the compiler, we get the compilation result
ncr_result = negative_conditions_remover.compile(
dcr_problem,
CompilationKind.NEGATIVE_CONDITIONS_REMOVING
)
ncr_problem = ncr_result.problem
ncr_kind = ncr_problem.kind
# Check the result of the compilation
assert original_problem_kind.has_negative_conditions()
assert qr_kind.has_negative_conditions()
assert cer_kind.has_negative_conditions()
assert dcr_kind.has_negative_conditions()
assert not ncr_kind.has_negative_conditions()
6.8. Solving the obtained problem with pyperplan
After all the compilers have been used in a pipeline, we can solve the problem with pyperplan.
6.8.1. Considerations on the plan obtained
As we can see in the simulation, the plan obtained makes no sense for the original plan; but we can focus on the length of the plan. We see it has 6 action instances, intuitively, 3 steps repeated twice: - switch one heater on - get a job done - switch the heater off
[8]:
# Get the planner from the factory
with OneshotPlanner(name = "pyperplan") as planner:
assert planner.supports(ncr_kind) # Make sure the planner supports the compiled problem
ncr_plan = planner.solve(ncr_problem).plan # Solve the problem and get the plan for the compiled problem
print(ncr_plan)
assert len(ncr_plan.actions) == 6
*** Credits ***
* In operation mode `OneshotPlanner` at line 2 of `/tmp/ipykernel_155649/1542274702.py`, you are using the following planning engine:
* Engine name: pyperplan
* Developers: Albert-Ludwigs-Universität Freiburg (Yusra Alkhazraji, Matthias Frorath, Markus Grützner, Malte Helmert, Thomas Liebetraut, Robert Mattmüller, Manuela Ortlieb, Jendrik Seipp, Tobias Springenberg, Philip Stahl, Jan Wülfing)
* Description: Pyperplan is a lightweight STRIPS planner written in Python.
SequentialPlan:
switch_heater_on_0(heater_1)
perform_job_0(clean)
switch_heater_off_0(heater_1)
switch_heater_on_0(heater_2)
perform_job_0(work)
switch_heater_off_0(heater_2)
6.9. How to get a plan valid for the original problem
All the compilers we used provide the capabilities of rewriting an action instance of the compiled problem into an action instance of the input problem.
So, since we used a pipeline of 4 compilers, we have to rewrite back the plan 4 times.
To rewrite back a plan from the compiled problem to the input problem (respectively, compiled_plan and input_plan), we use 2 main features offered by the unified_planning_framework: - The CompilationResult.map_back_action_instance
, a field of type: Callable[[ActionInstance], ActionInstance]
. This function takes an ActionInstance
of the compiled problem and returns the equivalent ActionInstance
of the input problem. - The Plan.replace_action_instances
method, which takes
exactly 1 argument of type Callable[[ActionInstance], ActionInstance]
, and creates a new plan where every action instance of the original plan is replaced with the result given by the function given as parameter.
Using those 2 features allows us to easily get the equivalent plan for the input problem, and by following the compilers pipeline backwards we can get the plan for the original problem.
[9]:
from unified_planning.engines import ValidationResultStatus
# The ValidationResultStatus class is defined in the unified_planning/engines/results.py file
# Create the equivalent plan for the dcr_problem (the one created by the disjunctive conditions remover)
dcr_plan = ncr_plan.replace_action_instances(ncr_result.map_back_action_instance)
# Check to see if the plan is actually valid for the problem
print(dcr_kind)
with PlanValidator(problem_kind = dcr_kind) as validator:
assert validator.validate(dcr_problem, dcr_plan).status == ValidationResultStatus.VALID
PROBLEM_CLASS: ['ACTION_BASED']
CONDITIONS_KIND: ['NEGATIVE_CONDITIONS']
TYPING: ['FLAT_TYPING', 'HIERARCHICAL_TYPING']
6.10. Final result
Now repeat the process for all the compilers we used.
As we wanted to achieve, with the use of the compilers we managed to solve a problem with pyperplan, when pyperplan was not able to solve said problem.
[10]:
# Get the plan for the cer_problem
cer_plan = dcr_plan.replace_action_instances(dcr_result.map_back_action_instance)
# Get the plan for the qr_problem
qr_plan = cer_plan.replace_action_instances(cer_result.map_back_action_instance)
# Get the plan for the original problem
original_plan = qr_plan.replace_action_instances(qr_result.map_back_action_instance)
# Check to see if the obtained plan is actually valid for the original problem
with PlanValidator(problem_kind = original_problem_kind) as validator:
assert validator.validate(original_problem, original_plan).status == ValidationResultStatus.VALID
print(original_plan)
SequentialPlan:
switch_heater_on(heater_1)
perform_job(clean)
switch_heater_off(heater_1)
switch_heater_on(heater_2)
perform_job(work)
switch_heater_off(heater_2)
[ ]:
from unified_planning.plot import plot_sequential_plan
Ignore the code below, it’s used to make this notebook also runnable in the Countinuous Intergation.
[ ]:
# Redefine the plot package methods imported above to print the plot to a temp file
# if the exception "could not locate runnable browser" is raised. This usually happens
# in the Continuous Integration.
from inspect import getmembers, isfunction
from unified_planning import plot
from functools import partial
import os, uuid, tempfile as tf
# Define the function that will be executed instead
def _function(original_function, *args, **kwargs):
try:
original_function(*args, **kwargs)
except Exception as e:
if "could not locate runnable browser" in str(e):
original_function(*args, **kwargs,
filename=f"{os.path.join(tf.gettempdir(), str(uuid.uuid1()))}.png"
)
else:
raise e
# Iterate over all the functions of the plot package
for function_name, function in getmembers(plot, isfunction):
# Override the original function with the new one
globals()[function_name] = partial(_function, function)
[ ]:
plot_sequential_plan(original_plan, figsize=(8, 20), node_size=21000, font_size=10, top_bottom=True)